4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
53 #include <lustre_net.h>
54 #include "osc_internal.h"
55 #include "osc_cl_internal.h"
57 atomic_t osc_pool_req_count;
58 unsigned int osc_reqpool_maxreqcount;
59 struct ptlrpc_request_pool *osc_rq_pool;
61 /* max memory used for request pool, unit is MB */
62 static unsigned int osc_reqpool_mem_max = 5;
63 module_param(osc_reqpool_mem_max, uint, 0444);
65 struct osc_brw_async_args {
71 struct brw_page **aa_ppga;
72 struct client_obd *aa_cli;
73 struct list_head aa_oaps;
74 struct list_head aa_exts;
75 struct cl_req *aa_clerq;
78 #define osc_grant_args osc_brw_async_args
80 struct osc_setattr_args {
82 obd_enqueue_update_f sa_upcall;
86 struct osc_fsync_args {
87 struct osc_object *fa_obj;
89 obd_enqueue_update_f fa_upcall;
93 struct osc_enqueue_args {
94 struct obd_export *oa_exp;
95 enum ldlm_type oa_type;
96 enum ldlm_mode oa_mode;
98 osc_enqueue_upcall_f oa_upcall;
100 struct ost_lvb *oa_lvb;
101 struct lustre_handle oa_lockh;
102 unsigned int oa_agl:1;
105 static void osc_release_ppga(struct brw_page **ppga, size_t count);
106 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
109 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
111 struct ost_body *body;
113 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
116 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
119 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
122 struct ptlrpc_request *req;
123 struct ost_body *body;
127 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
131 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
133 ptlrpc_request_free(req);
137 osc_pack_req_body(req, oa);
139 ptlrpc_request_set_replen(req);
141 rc = ptlrpc_queue_wait(req);
145 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
147 GOTO(out, rc = -EPROTO);
149 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
150 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
152 oa->o_blksize = cli_brw_size(exp->exp_obd);
153 oa->o_valid |= OBD_MD_FLBLKSZ;
157 ptlrpc_req_finished(req);
162 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
165 struct ptlrpc_request *req;
166 struct ost_body *body;
170 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
172 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
176 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
178 ptlrpc_request_free(req);
182 osc_pack_req_body(req, oa);
184 ptlrpc_request_set_replen(req);
186 rc = ptlrpc_queue_wait(req);
190 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
192 GOTO(out, rc = -EPROTO);
194 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
198 ptlrpc_req_finished(req);
203 static int osc_setattr_interpret(const struct lu_env *env,
204 struct ptlrpc_request *req,
205 struct osc_setattr_args *sa, int rc)
207 struct ost_body *body;
213 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215 GOTO(out, rc = -EPROTO);
217 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
220 rc = sa->sa_upcall(sa->sa_cookie, rc);
224 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
225 obd_enqueue_update_f upcall, void *cookie,
226 struct ptlrpc_request_set *rqset)
228 struct ptlrpc_request *req;
229 struct osc_setattr_args *sa;
234 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
238 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
240 ptlrpc_request_free(req);
244 osc_pack_req_body(req, oa);
246 ptlrpc_request_set_replen(req);
248 /* do mds to ost setattr asynchronously */
250 /* Do not wait for response. */
251 ptlrpcd_add_req(req);
253 req->rq_interpret_reply =
254 (ptlrpc_interpterer_t)osc_setattr_interpret;
256 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
257 sa = ptlrpc_req_async_args(req);
259 sa->sa_upcall = upcall;
260 sa->sa_cookie = cookie;
262 if (rqset == PTLRPCD_SET)
263 ptlrpcd_add_req(req);
265 ptlrpc_set_add_req(rqset, req);
271 static int osc_create(const struct lu_env *env, struct obd_export *exp,
274 struct ptlrpc_request *req;
275 struct ost_body *body;
280 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
281 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
283 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
285 GOTO(out, rc = -ENOMEM);
287 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
289 ptlrpc_request_free(req);
293 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
296 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
298 ptlrpc_request_set_replen(req);
300 rc = ptlrpc_queue_wait(req);
304 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
306 GOTO(out_req, rc = -EPROTO);
308 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
309 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
311 oa->o_blksize = cli_brw_size(exp->exp_obd);
312 oa->o_valid |= OBD_MD_FLBLKSZ;
314 CDEBUG(D_HA, "transno: "LPD64"\n",
315 lustre_msg_get_transno(req->rq_repmsg));
317 ptlrpc_req_finished(req);
322 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
323 obd_enqueue_update_f upcall, void *cookie,
324 struct ptlrpc_request_set *rqset)
326 struct ptlrpc_request *req;
327 struct osc_setattr_args *sa;
328 struct ost_body *body;
332 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
336 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
338 ptlrpc_request_free(req);
341 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
342 ptlrpc_at_set_req_timeout(req);
344 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
346 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
348 ptlrpc_request_set_replen(req);
350 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
351 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
352 sa = ptlrpc_req_async_args(req);
354 sa->sa_upcall = upcall;
355 sa->sa_cookie = cookie;
356 if (rqset == PTLRPCD_SET)
357 ptlrpcd_add_req(req);
359 ptlrpc_set_add_req(rqset, req);
364 static int osc_sync_interpret(const struct lu_env *env,
365 struct ptlrpc_request *req,
368 struct osc_fsync_args *fa = arg;
369 struct ost_body *body;
370 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
371 unsigned long valid = 0;
372 struct cl_object *obj;
378 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
380 CERROR("can't unpack ost_body\n");
381 GOTO(out, rc = -EPROTO);
384 *fa->fa_oa = body->oa;
385 obj = osc2cl(fa->fa_obj);
387 /* Update osc object's blocks attribute */
388 cl_object_attr_lock(obj);
389 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
390 attr->cat_blocks = body->oa.o_blocks;
395 cl_object_attr_update(env, obj, attr, valid);
396 cl_object_attr_unlock(obj);
399 rc = fa->fa_upcall(fa->fa_cookie, rc);
403 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
404 obd_enqueue_update_f upcall, void *cookie,
405 struct ptlrpc_request_set *rqset)
407 struct obd_export *exp = osc_export(obj);
408 struct ptlrpc_request *req;
409 struct ost_body *body;
410 struct osc_fsync_args *fa;
414 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
418 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
420 ptlrpc_request_free(req);
424 /* overload the size and blocks fields in the oa with start/end */
425 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
427 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
429 ptlrpc_request_set_replen(req);
430 req->rq_interpret_reply = osc_sync_interpret;
432 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
433 fa = ptlrpc_req_async_args(req);
436 fa->fa_upcall = upcall;
437 fa->fa_cookie = cookie;
439 if (rqset == PTLRPCD_SET)
440 ptlrpcd_add_req(req);
442 ptlrpc_set_add_req(rqset, req);
447 /* Find and cancel locally locks matched by @mode in the resource found by
448 * @objid. Found locks are added into @cancel list. Returns the amount of
449 * locks added to @cancels list. */
450 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
451 struct list_head *cancels,
452 enum ldlm_mode mode, __u64 lock_flags)
454 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
455 struct ldlm_res_id res_id;
456 struct ldlm_resource *res;
460 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
461 * export) but disabled through procfs (flag in NS).
463 * This distinguishes from a case when ELC is not supported originally,
464 * when we still want to cancel locks in advance and just cancel them
465 * locally, without sending any RPC. */
466 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
469 ostid_build_res_name(&oa->o_oi, &res_id);
470 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
474 LDLM_RESOURCE_ADDREF(res);
475 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
476 lock_flags, 0, NULL);
477 LDLM_RESOURCE_DELREF(res);
478 ldlm_resource_putref(res);
482 static int osc_destroy_interpret(const struct lu_env *env,
483 struct ptlrpc_request *req, void *data,
486 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
488 atomic_dec(&cli->cl_destroy_in_flight);
489 wake_up(&cli->cl_destroy_waitq);
493 static int osc_can_send_destroy(struct client_obd *cli)
495 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
496 cli->cl_max_rpcs_in_flight) {
497 /* The destroy request can be sent */
500 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
501 cli->cl_max_rpcs_in_flight) {
503 * The counter has been modified between the two atomic
506 wake_up(&cli->cl_destroy_waitq);
511 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
514 struct client_obd *cli = &exp->exp_obd->u.cli;
515 struct ptlrpc_request *req;
516 struct ost_body *body;
517 struct list_head cancels = LIST_HEAD_INIT(cancels);
522 CDEBUG(D_INFO, "oa NULL\n");
526 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
527 LDLM_FL_DISCARD_DATA);
529 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
531 ldlm_lock_list_put(&cancels, l_bl_ast, count);
535 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
538 ptlrpc_request_free(req);
542 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
543 ptlrpc_at_set_req_timeout(req);
545 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
547 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
549 ptlrpc_request_set_replen(req);
551 req->rq_interpret_reply = osc_destroy_interpret;
552 if (!osc_can_send_destroy(cli)) {
553 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
556 * Wait until the number of on-going destroy RPCs drops
557 * under max_rpc_in_flight
559 l_wait_event_exclusive(cli->cl_destroy_waitq,
560 osc_can_send_destroy(cli), &lwi);
563 /* Do not wait for response */
564 ptlrpcd_add_req(req);
568 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
571 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
573 LASSERT(!(oa->o_valid & bits));
576 spin_lock(&cli->cl_loi_list_lock);
577 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
578 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
579 cli->cl_dirty_max_pages)) {
580 CERROR("dirty %lu - %lu > dirty_max %lu\n",
581 cli->cl_dirty_pages, cli->cl_dirty_transit,
582 cli->cl_dirty_max_pages);
584 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
585 atomic_long_read(&obd_dirty_transit_pages) >
586 (long)(obd_max_dirty_pages + 1))) {
587 /* The atomic_read() allowing the atomic_inc() are
588 * not covered by a lock thus they may safely race and trip
589 * this CERROR() unless we add in a small fudge factor (+1). */
590 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
591 cli_name(cli), atomic_long_read(&obd_dirty_pages),
592 atomic_long_read(&obd_dirty_transit_pages),
593 obd_max_dirty_pages);
595 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
597 CERROR("dirty %lu - dirty_max %lu too big???\n",
598 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
601 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
603 (cli->cl_max_rpcs_in_flight + 1);
604 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
607 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
608 oa->o_dropped = cli->cl_lost_grant;
609 cli->cl_lost_grant = 0;
610 spin_unlock(&cli->cl_loi_list_lock);
611 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
612 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
616 void osc_update_next_shrink(struct client_obd *cli)
618 cli->cl_next_shrink_grant =
619 cfs_time_shift(cli->cl_grant_shrink_interval);
620 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
621 cli->cl_next_shrink_grant);
624 static void __osc_update_grant(struct client_obd *cli, u64 grant)
626 spin_lock(&cli->cl_loi_list_lock);
627 cli->cl_avail_grant += grant;
628 spin_unlock(&cli->cl_loi_list_lock);
631 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
633 if (body->oa.o_valid & OBD_MD_FLGRANT) {
634 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
635 __osc_update_grant(cli, body->oa.o_grant);
639 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
640 u32 keylen, void *key,
641 u32 vallen, void *val,
642 struct ptlrpc_request_set *set);
644 static int osc_shrink_grant_interpret(const struct lu_env *env,
645 struct ptlrpc_request *req,
648 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
649 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
650 struct ost_body *body;
653 __osc_update_grant(cli, oa->o_grant);
657 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
659 osc_update_grant(cli, body);
665 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
667 spin_lock(&cli->cl_loi_list_lock);
668 oa->o_grant = cli->cl_avail_grant / 4;
669 cli->cl_avail_grant -= oa->o_grant;
670 spin_unlock(&cli->cl_loi_list_lock);
671 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
672 oa->o_valid |= OBD_MD_FLFLAGS;
675 oa->o_flags |= OBD_FL_SHRINK_GRANT;
676 osc_update_next_shrink(cli);
679 /* Shrink the current grant, either from some large amount to enough for a
680 * full set of in-flight RPCs, or if we have already shrunk to that limit
681 * then to enough for a single RPC. This avoids keeping more grant than
682 * needed, and avoids shrinking the grant piecemeal. */
683 static int osc_shrink_grant(struct client_obd *cli)
685 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
686 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
688 spin_lock(&cli->cl_loi_list_lock);
689 if (cli->cl_avail_grant <= target_bytes)
690 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
691 spin_unlock(&cli->cl_loi_list_lock);
693 return osc_shrink_grant_to_target(cli, target_bytes);
696 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
699 struct ost_body *body;
702 spin_lock(&cli->cl_loi_list_lock);
703 /* Don't shrink if we are already above or below the desired limit
704 * We don't want to shrink below a single RPC, as that will negatively
705 * impact block allocation and long-term performance. */
706 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
707 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
709 if (target_bytes >= cli->cl_avail_grant) {
710 spin_unlock(&cli->cl_loi_list_lock);
713 spin_unlock(&cli->cl_loi_list_lock);
719 osc_announce_cached(cli, &body->oa, 0);
721 spin_lock(&cli->cl_loi_list_lock);
722 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
723 cli->cl_avail_grant = target_bytes;
724 spin_unlock(&cli->cl_loi_list_lock);
725 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
726 body->oa.o_valid |= OBD_MD_FLFLAGS;
727 body->oa.o_flags = 0;
729 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
730 osc_update_next_shrink(cli);
732 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
733 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
734 sizeof(*body), body, NULL);
736 __osc_update_grant(cli, body->oa.o_grant);
741 static int osc_should_shrink_grant(struct client_obd *client)
743 cfs_time_t time = cfs_time_current();
744 cfs_time_t next_shrink = client->cl_next_shrink_grant;
746 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
747 OBD_CONNECT_GRANT_SHRINK) == 0)
750 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
751 /* Get the current RPC size directly, instead of going via:
752 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
753 * Keep comment here so that it can be found by searching. */
754 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
756 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
757 client->cl_avail_grant > brw_size)
760 osc_update_next_shrink(client);
765 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
767 struct client_obd *client;
769 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
770 if (osc_should_shrink_grant(client))
771 osc_shrink_grant(client);
776 static int osc_add_shrink_grant(struct client_obd *client)
780 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
782 osc_grant_shrink_grant_cb, NULL,
783 &client->cl_grant_shrink_list);
785 CERROR("add grant client %s error %d\n", cli_name(client), rc);
788 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
789 osc_update_next_shrink(client);
793 static int osc_del_shrink_grant(struct client_obd *client)
795 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
799 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
802 * ocd_grant is the total grant amount we're expect to hold: if we've
803 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
804 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
807 * race is tolerable here: if we're evicted, but imp_state already
808 * left EVICTED state, then cl_dirty_pages must be 0 already.
810 spin_lock(&cli->cl_loi_list_lock);
811 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
812 cli->cl_avail_grant = ocd->ocd_grant;
814 cli->cl_avail_grant = ocd->ocd_grant -
815 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
817 if (cli->cl_avail_grant < 0) {
818 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
819 cli_name(cli), cli->cl_avail_grant,
820 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
821 /* workaround for servers which do not have the patch from
823 cli->cl_avail_grant = ocd->ocd_grant;
826 /* determine the appropriate chunk size used by osc_extent. */
827 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
828 spin_unlock(&cli->cl_loi_list_lock);
830 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
831 "chunk bits: %d.\n", cli_name(cli), cli->cl_avail_grant,
832 cli->cl_lost_grant, cli->cl_chunkbits);
834 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
835 list_empty(&cli->cl_grant_shrink_list))
836 osc_add_shrink_grant(cli);
839 /* We assume that the reason this OSC got a short read is because it read
840 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
841 * via the LOV, and it _knows_ it's reading inside the file, it's just that
842 * this stripe never got written at or beyond this stripe offset yet. */
843 static void handle_short_read(int nob_read, size_t page_count,
844 struct brw_page **pga)
849 /* skip bytes read OK */
850 while (nob_read > 0) {
851 LASSERT (page_count > 0);
853 if (pga[i]->count > nob_read) {
854 /* EOF inside this page */
855 ptr = kmap(pga[i]->pg) +
856 (pga[i]->off & ~PAGE_MASK);
857 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
864 nob_read -= pga[i]->count;
869 /* zero remaining pages */
870 while (page_count-- > 0) {
871 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
872 memset(ptr, 0, pga[i]->count);
878 static int check_write_rcs(struct ptlrpc_request *req,
879 int requested_nob, int niocount,
880 size_t page_count, struct brw_page **pga)
885 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
886 sizeof(*remote_rcs) *
888 if (remote_rcs == NULL) {
889 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
893 /* return error if any niobuf was in error */
894 for (i = 0; i < niocount; i++) {
895 if ((int)remote_rcs[i] < 0)
896 return(remote_rcs[i]);
898 if (remote_rcs[i] != 0) {
899 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
900 i, remote_rcs[i], req);
905 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
906 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
907 req->rq_bulk->bd_nob_transferred, requested_nob);
914 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
916 if (p1->flag != p2->flag) {
917 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
918 OBD_BRW_SYNC | OBD_BRW_ASYNC |
919 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
921 /* warn if we try to combine flags that we don't know to be
923 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
924 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
925 "report this at https://jira.hpdd.intel.com/\n",
931 return (p1->off + p1->count == p2->off);
934 static u32 osc_checksum_bulk(int nob, size_t pg_count,
935 struct brw_page **pga, int opc,
936 cksum_type_t cksum_type)
940 struct cfs_crypto_hash_desc *hdesc;
941 unsigned int bufsize;
943 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
945 LASSERT(pg_count > 0);
947 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
949 CERROR("Unable to initialize checksum hash %s\n",
950 cfs_crypto_hash_name(cfs_alg));
951 return PTR_ERR(hdesc);
954 while (nob > 0 && pg_count > 0) {
955 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
957 /* corrupt the data before we compute the checksum, to
958 * simulate an OST->client data error */
959 if (i == 0 && opc == OST_READ &&
960 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
961 unsigned char *ptr = kmap(pga[i]->pg);
962 int off = pga[i]->off & ~PAGE_MASK;
964 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
967 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
968 pga[i]->off & ~PAGE_MASK,
970 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
971 (int)(pga[i]->off & ~PAGE_MASK));
973 nob -= pga[i]->count;
978 bufsize = sizeof(cksum);
979 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
981 /* For sending we only compute the wrong checksum instead
982 * of corrupting the data so it is still correct on a redo */
983 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
990 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
991 u32 page_count, struct brw_page **pga,
992 struct ptlrpc_request **reqp, int resend)
994 struct ptlrpc_request *req;
995 struct ptlrpc_bulk_desc *desc;
996 struct ost_body *body;
997 struct obd_ioobj *ioobj;
998 struct niobuf_remote *niobuf;
999 int niocount, i, requested_nob, opc, rc;
1000 struct osc_brw_async_args *aa;
1001 struct req_capsule *pill;
1002 struct brw_page *pg_prev;
1005 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1006 RETURN(-ENOMEM); /* Recoverable */
1007 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1008 RETURN(-EINVAL); /* Fatal */
1010 if ((cmd & OBD_BRW_WRITE) != 0) {
1012 req = ptlrpc_request_alloc_pool(cli->cl_import,
1014 &RQF_OST_BRW_WRITE);
1017 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1022 for (niocount = i = 1; i < page_count; i++) {
1023 if (!can_merge_pages(pga[i - 1], pga[i]))
1027 pill = &req->rq_pill;
1028 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1030 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1031 niocount * sizeof(*niobuf));
1033 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1035 ptlrpc_request_free(req);
1038 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1039 ptlrpc_at_set_req_timeout(req);
1040 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1042 req->rq_no_retry_einprogress = 1;
1044 desc = ptlrpc_prep_bulk_imp(req, page_count,
1045 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1046 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1047 PTLRPC_BULK_PUT_SINK) |
1048 PTLRPC_BULK_BUF_KIOV,
1050 &ptlrpc_bulk_kiov_pin_ops);
1053 GOTO(out, rc = -ENOMEM);
1054 /* NB request now owns desc and will free it when it gets freed */
1056 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1057 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1058 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1059 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1061 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1063 obdo_to_ioobj(oa, ioobj);
1064 ioobj->ioo_bufcnt = niocount;
1065 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1066 * that might be send for this request. The actual number is decided
1067 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1068 * "max - 1" for old client compatibility sending "0", and also so the
1069 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1070 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1071 LASSERT(page_count > 0);
1073 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1074 struct brw_page *pg = pga[i];
1075 int poff = pg->off & ~PAGE_MASK;
1077 LASSERT(pg->count > 0);
1078 /* make sure there is no gap in the middle of page array */
1079 LASSERTF(page_count == 1 ||
1080 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1081 ergo(i > 0 && i < page_count - 1,
1082 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1083 ergo(i == page_count - 1, poff == 0)),
1084 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1085 i, page_count, pg, pg->off, pg->count);
1086 LASSERTF(i == 0 || pg->off > pg_prev->off,
1087 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1088 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1090 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1091 pg_prev->pg, page_private(pg_prev->pg),
1092 pg_prev->pg->index, pg_prev->off);
1093 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1094 (pg->flag & OBD_BRW_SRVLOCK));
1096 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1097 requested_nob += pg->count;
1099 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1101 niobuf->rnb_len += pg->count;
1103 niobuf->rnb_offset = pg->off;
1104 niobuf->rnb_len = pg->count;
1105 niobuf->rnb_flags = pg->flag;
1110 LASSERTF((void *)(niobuf - niocount) ==
1111 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1112 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1113 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1115 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1117 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1118 body->oa.o_valid |= OBD_MD_FLFLAGS;
1119 body->oa.o_flags = 0;
1121 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1124 if (osc_should_shrink_grant(cli))
1125 osc_shrink_grant_local(cli, &body->oa);
1127 /* size[REQ_REC_OFF] still sizeof (*body) */
1128 if (opc == OST_WRITE) {
1129 if (cli->cl_checksum &&
1130 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1131 /* store cl_cksum_type in a local variable since
1132 * it can be changed via lprocfs */
1133 cksum_type_t cksum_type = cli->cl_cksum_type;
1135 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1136 oa->o_flags &= OBD_FL_LOCAL_MASK;
1137 body->oa.o_flags = 0;
1139 body->oa.o_flags |= cksum_type_pack(cksum_type);
1140 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1141 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1145 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1147 /* save this in 'oa', too, for later checking */
1148 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1149 oa->o_flags |= cksum_type_pack(cksum_type);
1151 /* clear out the checksum flag, in case this is a
1152 * resend but cl_checksum is no longer set. b=11238 */
1153 oa->o_valid &= ~OBD_MD_FLCKSUM;
1155 oa->o_cksum = body->oa.o_cksum;
1156 /* 1 RC per niobuf */
1157 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1158 sizeof(__u32) * niocount);
1160 if (cli->cl_checksum &&
1161 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1162 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1163 body->oa.o_flags = 0;
1164 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1165 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1168 ptlrpc_request_set_replen(req);
1170 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1171 aa = ptlrpc_req_async_args(req);
1173 aa->aa_requested_nob = requested_nob;
1174 aa->aa_nio_count = niocount;
1175 aa->aa_page_count = page_count;
1179 INIT_LIST_HEAD(&aa->aa_oaps);
1182 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1183 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1184 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1185 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1189 ptlrpc_req_finished(req);
1193 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1194 __u32 client_cksum, __u32 server_cksum, int nob,
1195 size_t page_count, struct brw_page **pga,
1196 cksum_type_t client_cksum_type)
1200 cksum_type_t cksum_type;
1202 if (server_cksum == client_cksum) {
1203 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1207 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1209 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1212 if (cksum_type != client_cksum_type)
1213 msg = "the server did not use the checksum type specified in "
1214 "the original request - likely a protocol problem";
1215 else if (new_cksum == server_cksum)
1216 msg = "changed on the client after we checksummed it - "
1217 "likely false positive due to mmap IO (bug 11742)";
1218 else if (new_cksum == client_cksum)
1219 msg = "changed in transit before arrival at OST";
1221 msg = "changed in transit AND doesn't match the original - "
1222 "likely false positive due to mmap IO (bug 11742)";
1224 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1225 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1226 msg, libcfs_nid2str(peer->nid),
1227 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1228 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1229 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1230 POSTID(&oa->o_oi), pga[0]->off,
1231 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1232 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1233 "client csum now %x\n", client_cksum, client_cksum_type,
1234 server_cksum, cksum_type, new_cksum);
1238 /* Note rc enters this function as number of bytes transferred */
1239 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1241 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1242 const lnet_process_id_t *peer =
1243 &req->rq_import->imp_connection->c_peer;
1244 struct client_obd *cli = aa->aa_cli;
1245 struct ost_body *body;
1246 u32 client_cksum = 0;
1249 if (rc < 0 && rc != -EDQUOT) {
1250 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1254 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1255 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1257 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1261 /* set/clear over quota flag for a uid/gid */
1262 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1263 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1264 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1266 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1267 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1269 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1272 osc_update_grant(cli, body);
1277 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1278 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1280 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1282 CERROR("Unexpected +ve rc %d\n", rc);
1285 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1287 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1290 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1291 check_write_checksum(&body->oa, peer, client_cksum,
1292 body->oa.o_cksum, aa->aa_requested_nob,
1293 aa->aa_page_count, aa->aa_ppga,
1294 cksum_type_unpack(aa->aa_oa->o_flags)))
1297 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1298 aa->aa_page_count, aa->aa_ppga);
1302 /* The rest of this function executes only for OST_READs */
1304 /* if unwrap_bulk failed, return -EAGAIN to retry */
1305 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1307 GOTO(out, rc = -EAGAIN);
1309 if (rc > aa->aa_requested_nob) {
1310 CERROR("Unexpected rc %d (%d requested)\n", rc,
1311 aa->aa_requested_nob);
1315 if (rc != req->rq_bulk->bd_nob_transferred) {
1316 CERROR ("Unexpected rc %d (%d transferred)\n",
1317 rc, req->rq_bulk->bd_nob_transferred);
1321 if (rc < aa->aa_requested_nob)
1322 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1324 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1325 static int cksum_counter;
1326 u32 server_cksum = body->oa.o_cksum;
1329 cksum_type_t cksum_type;
1331 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1332 body->oa.o_flags : 0);
1333 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1334 aa->aa_ppga, OST_READ,
1337 if (peer->nid != req->rq_bulk->bd_sender) {
1339 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1342 if (server_cksum != client_cksum) {
1343 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1344 "%s%s%s inode "DFID" object "DOSTID
1345 " extent ["LPU64"-"LPU64"]\n",
1346 req->rq_import->imp_obd->obd_name,
1347 libcfs_nid2str(peer->nid),
1349 body->oa.o_valid & OBD_MD_FLFID ?
1350 body->oa.o_parent_seq : (__u64)0,
1351 body->oa.o_valid & OBD_MD_FLFID ?
1352 body->oa.o_parent_oid : 0,
1353 body->oa.o_valid & OBD_MD_FLFID ?
1354 body->oa.o_parent_ver : 0,
1355 POSTID(&body->oa.o_oi),
1356 aa->aa_ppga[0]->off,
1357 aa->aa_ppga[aa->aa_page_count-1]->off +
1358 aa->aa_ppga[aa->aa_page_count-1]->count -
1360 CERROR("client %x, server %x, cksum_type %x\n",
1361 client_cksum, server_cksum, cksum_type);
1363 aa->aa_oa->o_cksum = client_cksum;
1367 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1370 } else if (unlikely(client_cksum)) {
1371 static int cksum_missed;
1374 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1375 CERROR("Checksum %u requested from %s but not sent\n",
1376 cksum_missed, libcfs_nid2str(peer->nid));
1382 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1383 aa->aa_oa, &body->oa);
1388 static int osc_brw_redo_request(struct ptlrpc_request *request,
1389 struct osc_brw_async_args *aa, int rc)
1391 struct ptlrpc_request *new_req;
1392 struct osc_brw_async_args *new_aa;
1393 struct osc_async_page *oap;
1396 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1397 "redo for recoverable error %d", rc);
1399 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1400 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1401 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1402 aa->aa_ppga, &new_req, 1);
1406 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1407 if (oap->oap_request != NULL) {
1408 LASSERTF(request == oap->oap_request,
1409 "request %p != oap_request %p\n",
1410 request, oap->oap_request);
1411 if (oap->oap_interrupted) {
1412 ptlrpc_req_finished(new_req);
1417 /* New request takes over pga and oaps from old request.
1418 * Note that copying a list_head doesn't work, need to move it... */
1420 new_req->rq_interpret_reply = request->rq_interpret_reply;
1421 new_req->rq_async_args = request->rq_async_args;
1422 new_req->rq_commit_cb = request->rq_commit_cb;
1423 /* cap resend delay to the current request timeout, this is similar to
1424 * what ptlrpc does (see after_reply()) */
1425 if (aa->aa_resends > new_req->rq_timeout)
1426 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1428 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1429 new_req->rq_generation_set = 1;
1430 new_req->rq_import_generation = request->rq_import_generation;
1432 new_aa = ptlrpc_req_async_args(new_req);
1434 INIT_LIST_HEAD(&new_aa->aa_oaps);
1435 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1436 INIT_LIST_HEAD(&new_aa->aa_exts);
1437 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1438 new_aa->aa_resends = aa->aa_resends;
1440 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1441 if (oap->oap_request) {
1442 ptlrpc_req_finished(oap->oap_request);
1443 oap->oap_request = ptlrpc_request_addref(new_req);
1447 /* XXX: This code will run into problem if we're going to support
1448 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1449 * and wait for all of them to be finished. We should inherit request
1450 * set from old request. */
1451 ptlrpcd_add_req(new_req);
1453 DEBUG_REQ(D_INFO, new_req, "new request");
1458 * ugh, we want disk allocation on the target to happen in offset order. we'll
1459 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1460 * fine for our small page arrays and doesn't require allocation. its an
1461 * insertion sort that swaps elements that are strides apart, shrinking the
1462 * stride down until its '1' and the array is sorted.
1464 static void sort_brw_pages(struct brw_page **array, int num)
1467 struct brw_page *tmp;
1471 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1476 for (i = stride ; i < num ; i++) {
1479 while (j >= stride && array[j - stride]->off > tmp->off) {
1480 array[j] = array[j - stride];
1485 } while (stride > 1);
1488 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1490 LASSERT(ppga != NULL);
1491 OBD_FREE(ppga, sizeof(*ppga) * count);
1494 static int brw_interpret(const struct lu_env *env,
1495 struct ptlrpc_request *req, void *data, int rc)
1497 struct osc_brw_async_args *aa = data;
1498 struct osc_extent *ext;
1499 struct osc_extent *tmp;
1500 struct client_obd *cli = aa->aa_cli;
1503 rc = osc_brw_fini_request(req, rc);
1504 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1505 /* When server return -EINPROGRESS, client should always retry
1506 * regardless of the number of times the bulk was resent already. */
1507 if (osc_recoverable_error(rc)) {
1508 if (req->rq_import_generation !=
1509 req->rq_import->imp_generation) {
1510 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1511 ""DOSTID", rc = %d.\n",
1512 req->rq_import->imp_obd->obd_name,
1513 POSTID(&aa->aa_oa->o_oi), rc);
1514 } else if (rc == -EINPROGRESS ||
1515 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1516 rc = osc_brw_redo_request(req, aa, rc);
1518 CERROR("%s: too many resent retries for object: "
1519 ""LPU64":"LPU64", rc = %d.\n",
1520 req->rq_import->imp_obd->obd_name,
1521 POSTID(&aa->aa_oa->o_oi), rc);
1526 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1531 struct obdo *oa = aa->aa_oa;
1532 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1533 unsigned long valid = 0;
1534 struct cl_object *obj;
1535 struct osc_async_page *last;
1537 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1538 obj = osc2cl(last->oap_obj);
1540 cl_object_attr_lock(obj);
1541 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1542 attr->cat_blocks = oa->o_blocks;
1543 valid |= CAT_BLOCKS;
1545 if (oa->o_valid & OBD_MD_FLMTIME) {
1546 attr->cat_mtime = oa->o_mtime;
1549 if (oa->o_valid & OBD_MD_FLATIME) {
1550 attr->cat_atime = oa->o_atime;
1553 if (oa->o_valid & OBD_MD_FLCTIME) {
1554 attr->cat_ctime = oa->o_ctime;
1558 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1559 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1560 loff_t last_off = last->oap_count + last->oap_obj_off +
1563 /* Change file size if this is an out of quota or
1564 * direct IO write and it extends the file size */
1565 if (loi->loi_lvb.lvb_size < last_off) {
1566 attr->cat_size = last_off;
1569 /* Extend KMS if it's not a lockless write */
1570 if (loi->loi_kms < last_off &&
1571 oap2osc_page(last)->ops_srvlock == 0) {
1572 attr->cat_kms = last_off;
1578 cl_object_attr_update(env, obj, attr, valid);
1579 cl_object_attr_unlock(obj);
1581 OBDO_FREE(aa->aa_oa);
1583 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1584 osc_inc_unstable_pages(req);
1586 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1587 list_del_init(&ext->oe_link);
1588 osc_extent_finish(env, ext, 1, rc);
1590 LASSERT(list_empty(&aa->aa_exts));
1591 LASSERT(list_empty(&aa->aa_oaps));
1593 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1594 req->rq_bulk->bd_nob_transferred);
1595 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1596 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1598 spin_lock(&cli->cl_loi_list_lock);
1599 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1600 * is called so we know whether to go to sync BRWs or wait for more
1601 * RPCs to complete */
1602 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1603 cli->cl_w_in_flight--;
1605 cli->cl_r_in_flight--;
1606 osc_wake_cache_waiters(cli);
1607 spin_unlock(&cli->cl_loi_list_lock);
1609 osc_io_unplug(env, cli, NULL);
1613 static void brw_commit(struct ptlrpc_request *req)
1615 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1616 * this called via the rq_commit_cb, I need to ensure
1617 * osc_dec_unstable_pages is still called. Otherwise unstable
1618 * pages may be leaked. */
1619 spin_lock(&req->rq_lock);
1620 if (likely(req->rq_unstable)) {
1621 req->rq_unstable = 0;
1622 spin_unlock(&req->rq_lock);
1624 osc_dec_unstable_pages(req);
1626 req->rq_committed = 1;
1627 spin_unlock(&req->rq_lock);
1632 * Build an RPC by the list of extent @ext_list. The caller must ensure
1633 * that the total pages in this list are NOT over max pages per RPC.
1634 * Extents in the list must be in OES_RPC state.
1636 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1637 struct list_head *ext_list, int cmd)
1639 struct ptlrpc_request *req = NULL;
1640 struct osc_extent *ext;
1641 struct brw_page **pga = NULL;
1642 struct osc_brw_async_args *aa = NULL;
1643 struct obdo *oa = NULL;
1644 struct osc_async_page *oap;
1645 struct osc_async_page *tmp;
1646 struct cl_req *clerq = NULL;
1647 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1649 struct cl_req_attr *crattr = NULL;
1650 loff_t starting_offset = OBD_OBJECT_EOF;
1651 loff_t ending_offset = 0;
1655 bool soft_sync = false;
1658 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1659 struct ost_body *body;
1661 LASSERT(!list_empty(ext_list));
1663 /* add pages into rpc_list to build BRW rpc */
1664 list_for_each_entry(ext, ext_list, oe_link) {
1665 LASSERT(ext->oe_state == OES_RPC);
1666 mem_tight |= ext->oe_memalloc;
1667 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1669 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1670 if (starting_offset == OBD_OBJECT_EOF ||
1671 starting_offset > oap->oap_obj_off)
1672 starting_offset = oap->oap_obj_off;
1674 LASSERT(oap->oap_page_off == 0);
1675 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1676 ending_offset = oap->oap_obj_off +
1679 LASSERT(oap->oap_page_off + oap->oap_count ==
1684 soft_sync = osc_over_unstable_soft_limit(cli);
1686 mpflag = cfs_memory_pressure_get_and_set();
1688 OBD_ALLOC(crattr, sizeof(*crattr));
1690 GOTO(out, rc = -ENOMEM);
1692 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1694 GOTO(out, rc = -ENOMEM);
1698 GOTO(out, rc = -ENOMEM);
1701 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1702 struct cl_page *page = oap2cl_page(oap);
1703 if (clerq == NULL) {
1704 clerq = cl_req_alloc(env, page, crt,
1705 1 /* only 1-object rpcs for now */);
1707 GOTO(out, rc = PTR_ERR(clerq));
1710 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1712 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1713 pga[i] = &oap->oap_brw_page;
1714 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1715 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1716 pga[i]->pg, page_index(oap->oap_page), oap,
1719 cl_req_page_add(env, clerq, page);
1722 /* always get the data for the obdo for the rpc */
1723 LASSERT(clerq != NULL);
1724 crattr->cra_oa = oa;
1725 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1727 rc = cl_req_prep(env, clerq);
1729 CERROR("cl_req_prep failed: %d\n", rc);
1733 sort_brw_pages(pga, page_count);
1734 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1736 CERROR("prep_req failed: %d\n", rc);
1740 req->rq_commit_cb = brw_commit;
1741 req->rq_interpret_reply = brw_interpret;
1744 req->rq_memalloc = 1;
1746 /* Need to update the timestamps after the request is built in case
1747 * we race with setattr (locally or in queue at OST). If OST gets
1748 * later setattr before earlier BRW (as determined by the request xid),
1749 * the OST will not use BRW timestamps. Sadly, there is no obvious
1750 * way to do this in a single call. bug 10150 */
1751 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1752 crattr->cra_oa = &body->oa;
1753 cl_req_attr_set(env, clerq, crattr,
1754 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1756 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1758 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1759 aa = ptlrpc_req_async_args(req);
1760 INIT_LIST_HEAD(&aa->aa_oaps);
1761 list_splice_init(&rpc_list, &aa->aa_oaps);
1762 INIT_LIST_HEAD(&aa->aa_exts);
1763 list_splice_init(ext_list, &aa->aa_exts);
1764 aa->aa_clerq = clerq;
1766 /* queued sync pages can be torn down while the pages
1767 * were between the pending list and the rpc */
1769 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1770 /* only one oap gets a request reference */
1773 if (oap->oap_interrupted && !req->rq_intr) {
1774 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1776 ptlrpc_mark_interrupted(req);
1780 tmp->oap_request = ptlrpc_request_addref(req);
1782 spin_lock(&cli->cl_loi_list_lock);
1783 starting_offset >>= PAGE_CACHE_SHIFT;
1784 if (cmd == OBD_BRW_READ) {
1785 cli->cl_r_in_flight++;
1786 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1787 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1788 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1789 starting_offset + 1);
1791 cli->cl_w_in_flight++;
1792 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1793 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1794 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1795 starting_offset + 1);
1797 spin_unlock(&cli->cl_loi_list_lock);
1799 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1800 page_count, aa, cli->cl_r_in_flight,
1801 cli->cl_w_in_flight);
1803 ptlrpcd_add_req(req);
1809 cfs_memory_pressure_restore(mpflag);
1812 OBD_FREE(crattr, sizeof(*crattr));
1815 LASSERT(req == NULL);
1820 OBD_FREE(pga, sizeof(*pga) * page_count);
1821 /* this should happen rarely and is pretty bad, it makes the
1822 * pending list not follow the dirty order */
1823 while (!list_empty(ext_list)) {
1824 ext = list_entry(ext_list->next, struct osc_extent,
1826 list_del_init(&ext->oe_link);
1827 osc_extent_finish(env, ext, 0, rc);
1829 if (clerq && !IS_ERR(clerq))
1830 cl_req_completion(env, clerq, rc);
1835 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1836 struct ldlm_enqueue_info *einfo)
1838 void *data = einfo->ei_cbdata;
1841 LASSERT(lock != NULL);
1842 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1843 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1844 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1845 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1847 lock_res_and_lock(lock);
1849 if (lock->l_ast_data == NULL)
1850 lock->l_ast_data = data;
1851 if (lock->l_ast_data == data)
1854 unlock_res_and_lock(lock);
1859 static int osc_set_data_with_check(struct lustre_handle *lockh,
1860 struct ldlm_enqueue_info *einfo)
1862 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1866 set = osc_set_lock_data_with_check(lock, einfo);
1867 LDLM_LOCK_PUT(lock);
1869 CERROR("lockh %p, data %p - client evicted?\n",
1870 lockh, einfo->ei_cbdata);
1874 static int osc_enqueue_fini(struct ptlrpc_request *req,
1875 osc_enqueue_upcall_f upcall, void *cookie,
1876 struct lustre_handle *lockh, enum ldlm_mode mode,
1877 __u64 *flags, int agl, int errcode)
1879 bool intent = *flags & LDLM_FL_HAS_INTENT;
1883 /* The request was created before ldlm_cli_enqueue call. */
1884 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1885 struct ldlm_reply *rep;
1887 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1888 LASSERT(rep != NULL);
1890 rep->lock_policy_res1 =
1891 ptlrpc_status_ntoh(rep->lock_policy_res1);
1892 if (rep->lock_policy_res1)
1893 errcode = rep->lock_policy_res1;
1895 *flags |= LDLM_FL_LVB_READY;
1896 } else if (errcode == ELDLM_OK) {
1897 *flags |= LDLM_FL_LVB_READY;
1900 /* Call the update callback. */
1901 rc = (*upcall)(cookie, lockh, errcode);
1903 /* release the reference taken in ldlm_cli_enqueue() */
1904 if (errcode == ELDLM_LOCK_MATCHED)
1906 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1907 ldlm_lock_decref(lockh, mode);
1912 static int osc_enqueue_interpret(const struct lu_env *env,
1913 struct ptlrpc_request *req,
1914 struct osc_enqueue_args *aa, int rc)
1916 struct ldlm_lock *lock;
1917 struct lustre_handle *lockh = &aa->oa_lockh;
1918 enum ldlm_mode mode = aa->oa_mode;
1919 struct ost_lvb *lvb = aa->oa_lvb;
1920 __u32 lvb_len = sizeof(*lvb);
1925 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1927 lock = ldlm_handle2lock(lockh);
1928 LASSERTF(lock != NULL,
1929 "lockh "LPX64", req %p, aa %p - client evicted?\n",
1930 lockh->cookie, req, aa);
1932 /* Take an additional reference so that a blocking AST that
1933 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1934 * to arrive after an upcall has been executed by
1935 * osc_enqueue_fini(). */
1936 ldlm_lock_addref(lockh, mode);
1938 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1939 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1941 /* Let CP AST to grant the lock first. */
1942 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1945 LASSERT(aa->oa_lvb == NULL);
1946 LASSERT(aa->oa_flags == NULL);
1947 aa->oa_flags = &flags;
1950 /* Complete obtaining the lock procedure. */
1951 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1952 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1954 /* Complete osc stuff. */
1955 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1956 aa->oa_flags, aa->oa_agl, rc);
1958 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1960 ldlm_lock_decref(lockh, mode);
1961 LDLM_LOCK_PUT(lock);
1965 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
1967 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
1968 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
1969 * other synchronous requests, however keeping some locks and trying to obtain
1970 * others may take a considerable amount of time in a case of ost failure; and
1971 * when other sync requests do not get released lock from a client, the client
1972 * is evicted from the cluster -- such scenarious make the life difficult, so
1973 * release locks just after they are obtained. */
1974 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
1975 __u64 *flags, union ldlm_policy_data *policy,
1976 struct ost_lvb *lvb, int kms_valid,
1977 osc_enqueue_upcall_f upcall, void *cookie,
1978 struct ldlm_enqueue_info *einfo,
1979 struct ptlrpc_request_set *rqset, int async, int agl)
1981 struct obd_device *obd = exp->exp_obd;
1982 struct lustre_handle lockh = { 0 };
1983 struct ptlrpc_request *req = NULL;
1984 int intent = *flags & LDLM_FL_HAS_INTENT;
1985 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
1986 enum ldlm_mode mode;
1990 /* Filesystem lock extents are extended to page boundaries so that
1991 * dealing with the page cache is a little smoother. */
1992 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
1993 policy->l_extent.end |= ~PAGE_MASK;
1996 * kms is not valid when either object is completely fresh (so that no
1997 * locks are cached), or object was evicted. In the latter case cached
1998 * lock cannot be used, because it would prime inode state with
1999 * potentially stale LVB.
2004 /* Next, search for already existing extent locks that will cover us */
2005 /* If we're trying to read, we also search for an existing PW lock. The
2006 * VFS and page cache already protect us locally, so lots of readers/
2007 * writers can share a single PW lock.
2009 * There are problems with conversion deadlocks, so instead of
2010 * converting a read lock to a write lock, we'll just enqueue a new
2013 * At some point we should cancel the read lock instead of making them
2014 * send us a blocking callback, but there are problems with canceling
2015 * locks out from other users right now, too. */
2016 mode = einfo->ei_mode;
2017 if (einfo->ei_mode == LCK_PR)
2019 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2020 einfo->ei_type, policy, mode, &lockh, 0);
2022 struct ldlm_lock *matched;
2024 if (*flags & LDLM_FL_TEST_LOCK)
2027 matched = ldlm_handle2lock(&lockh);
2029 /* AGL enqueues DLM locks speculatively. Therefore if
2030 * it already exists a DLM lock, it wll just inform the
2031 * caller to cancel the AGL process for this stripe. */
2032 ldlm_lock_decref(&lockh, mode);
2033 LDLM_LOCK_PUT(matched);
2035 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2036 *flags |= LDLM_FL_LVB_READY;
2038 /* We already have a lock, and it's referenced. */
2039 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2041 ldlm_lock_decref(&lockh, mode);
2042 LDLM_LOCK_PUT(matched);
2045 ldlm_lock_decref(&lockh, mode);
2046 LDLM_LOCK_PUT(matched);
2051 if (*flags & LDLM_FL_TEST_LOCK)
2055 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2056 &RQF_LDLM_ENQUEUE_LVB);
2060 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2062 ptlrpc_request_free(req);
2066 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2068 ptlrpc_request_set_replen(req);
2071 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2072 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2074 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2075 sizeof(*lvb), LVB_T_OST, &lockh, async);
2078 struct osc_enqueue_args *aa;
2079 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2080 aa = ptlrpc_req_async_args(req);
2082 aa->oa_mode = einfo->ei_mode;
2083 aa->oa_type = einfo->ei_type;
2084 lustre_handle_copy(&aa->oa_lockh, &lockh);
2085 aa->oa_upcall = upcall;
2086 aa->oa_cookie = cookie;
2089 aa->oa_flags = flags;
2092 /* AGL is essentially to enqueue an DLM lock
2093 * in advance, so we don't care about the
2094 * result of AGL enqueue. */
2096 aa->oa_flags = NULL;
2099 req->rq_interpret_reply =
2100 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2101 if (rqset == PTLRPCD_SET)
2102 ptlrpcd_add_req(req);
2104 ptlrpc_set_add_req(rqset, req);
2105 } else if (intent) {
2106 ptlrpc_req_finished(req);
2111 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2114 ptlrpc_req_finished(req);
2119 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2120 enum ldlm_type type, union ldlm_policy_data *policy,
2121 enum ldlm_mode mode, __u64 *flags, void *data,
2122 struct lustre_handle *lockh, int unref)
2124 struct obd_device *obd = exp->exp_obd;
2125 __u64 lflags = *flags;
2129 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2132 /* Filesystem lock extents are extended to page boundaries so that
2133 * dealing with the page cache is a little smoother */
2134 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2135 policy->l_extent.end |= ~PAGE_MASK;
2137 /* Next, search for already existing extent locks that will cover us */
2138 /* If we're trying to read, we also search for an existing PW lock. The
2139 * VFS and page cache already protect us locally, so lots of readers/
2140 * writers can share a single PW lock. */
2144 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2145 res_id, type, policy, rc, lockh, unref);
2148 if (!osc_set_data_with_check(lockh, data)) {
2149 if (!(lflags & LDLM_FL_TEST_LOCK))
2150 ldlm_lock_decref(lockh, rc);
2154 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2155 ldlm_lock_addref(lockh, LCK_PR);
2156 ldlm_lock_decref(lockh, LCK_PW);
2163 static int osc_statfs_interpret(const struct lu_env *env,
2164 struct ptlrpc_request *req,
2165 struct osc_async_args *aa, int rc)
2167 struct obd_statfs *msfs;
2171 /* The request has in fact never been sent
2172 * due to issues at a higher level (LOV).
2173 * Exit immediately since the caller is
2174 * aware of the problem and takes care
2175 * of the clean up */
2178 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2179 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2185 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2187 GOTO(out, rc = -EPROTO);
2190 *aa->aa_oi->oi_osfs = *msfs;
2192 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2196 static int osc_statfs_async(struct obd_export *exp,
2197 struct obd_info *oinfo, __u64 max_age,
2198 struct ptlrpc_request_set *rqset)
2200 struct obd_device *obd = class_exp2obd(exp);
2201 struct ptlrpc_request *req;
2202 struct osc_async_args *aa;
2206 /* We could possibly pass max_age in the request (as an absolute
2207 * timestamp or a "seconds.usec ago") so the target can avoid doing
2208 * extra calls into the filesystem if that isn't necessary (e.g.
2209 * during mount that would help a bit). Having relative timestamps
2210 * is not so great if request processing is slow, while absolute
2211 * timestamps are not ideal because they need time synchronization. */
2212 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2216 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2218 ptlrpc_request_free(req);
2221 ptlrpc_request_set_replen(req);
2222 req->rq_request_portal = OST_CREATE_PORTAL;
2223 ptlrpc_at_set_req_timeout(req);
2225 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2226 /* procfs requests not want stat in wait for avoid deadlock */
2227 req->rq_no_resend = 1;
2228 req->rq_no_delay = 1;
2231 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2232 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2233 aa = ptlrpc_req_async_args(req);
2236 ptlrpc_set_add_req(rqset, req);
2240 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2241 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2243 struct obd_device *obd = class_exp2obd(exp);
2244 struct obd_statfs *msfs;
2245 struct ptlrpc_request *req;
2246 struct obd_import *imp = NULL;
2250 /*Since the request might also come from lprocfs, so we need
2251 *sync this with client_disconnect_export Bug15684*/
2252 down_read(&obd->u.cli.cl_sem);
2253 if (obd->u.cli.cl_import)
2254 imp = class_import_get(obd->u.cli.cl_import);
2255 up_read(&obd->u.cli.cl_sem);
2259 /* We could possibly pass max_age in the request (as an absolute
2260 * timestamp or a "seconds.usec ago") so the target can avoid doing
2261 * extra calls into the filesystem if that isn't necessary (e.g.
2262 * during mount that would help a bit). Having relative timestamps
2263 * is not so great if request processing is slow, while absolute
2264 * timestamps are not ideal because they need time synchronization. */
2265 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2267 class_import_put(imp);
2272 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2274 ptlrpc_request_free(req);
2277 ptlrpc_request_set_replen(req);
2278 req->rq_request_portal = OST_CREATE_PORTAL;
2279 ptlrpc_at_set_req_timeout(req);
2281 if (flags & OBD_STATFS_NODELAY) {
2282 /* procfs requests not want stat in wait for avoid deadlock */
2283 req->rq_no_resend = 1;
2284 req->rq_no_delay = 1;
2287 rc = ptlrpc_queue_wait(req);
2291 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2293 GOTO(out, rc = -EPROTO);
2300 ptlrpc_req_finished(req);
2304 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2305 void *karg, void *uarg)
2307 struct obd_device *obd = exp->exp_obd;
2308 struct obd_ioctl_data *data = karg;
2312 if (!try_module_get(THIS_MODULE)) {
2313 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2314 module_name(THIS_MODULE));
2318 case OBD_IOC_CLIENT_RECOVER:
2319 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2320 data->ioc_inlbuf1, 0);
2324 case IOC_OSC_SET_ACTIVE:
2325 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2328 case OBD_IOC_PING_TARGET:
2329 err = ptlrpc_obd_ping(obd);
2332 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2333 cmd, current_comm());
2334 GOTO(out, err = -ENOTTY);
2337 module_put(THIS_MODULE);
2341 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2342 u32 keylen, void *key,
2343 u32 vallen, void *val,
2344 struct ptlrpc_request_set *set)
2346 struct ptlrpc_request *req;
2347 struct obd_device *obd = exp->exp_obd;
2348 struct obd_import *imp = class_exp2cliimp(exp);
2353 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2355 if (KEY_IS(KEY_CHECKSUM)) {
2356 if (vallen != sizeof(int))
2358 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2362 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2363 sptlrpc_conf_client_adapt(obd);
2367 if (KEY_IS(KEY_FLUSH_CTX)) {
2368 sptlrpc_import_flush_my_ctx(imp);
2372 if (KEY_IS(KEY_CACHE_SET)) {
2373 struct client_obd *cli = &obd->u.cli;
2375 LASSERT(cli->cl_cache == NULL); /* only once */
2376 cli->cl_cache = (struct cl_client_cache *)val;
2377 cl_cache_incref(cli->cl_cache);
2378 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2380 /* add this osc into entity list */
2381 LASSERT(list_empty(&cli->cl_lru_osc));
2382 spin_lock(&cli->cl_cache->ccc_lru_lock);
2383 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2384 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2389 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2390 struct client_obd *cli = &obd->u.cli;
2391 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2392 long target = *(long *)val;
2394 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2399 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2402 /* We pass all other commands directly to OST. Since nobody calls osc
2403 methods directly and everybody is supposed to go through LOV, we
2404 assume lov checked invalid values for us.
2405 The only recognised values so far are evict_by_nid and mds_conn.
2406 Even if something bad goes through, we'd get a -EINVAL from OST
2409 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2410 &RQF_OST_SET_GRANT_INFO :
2415 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2416 RCL_CLIENT, keylen);
2417 if (!KEY_IS(KEY_GRANT_SHRINK))
2418 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2419 RCL_CLIENT, vallen);
2420 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2422 ptlrpc_request_free(req);
2426 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2427 memcpy(tmp, key, keylen);
2428 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2431 memcpy(tmp, val, vallen);
2433 if (KEY_IS(KEY_GRANT_SHRINK)) {
2434 struct osc_grant_args *aa;
2437 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2438 aa = ptlrpc_req_async_args(req);
2441 ptlrpc_req_finished(req);
2444 *oa = ((struct ost_body *)val)->oa;
2446 req->rq_interpret_reply = osc_shrink_grant_interpret;
2449 ptlrpc_request_set_replen(req);
2450 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2451 LASSERT(set != NULL);
2452 ptlrpc_set_add_req(set, req);
2453 ptlrpc_check_set(NULL, set);
2455 ptlrpcd_add_req(req);
2461 static int osc_reconnect(const struct lu_env *env,
2462 struct obd_export *exp, struct obd_device *obd,
2463 struct obd_uuid *cluuid,
2464 struct obd_connect_data *data,
2467 struct client_obd *cli = &obd->u.cli;
2469 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2472 spin_lock(&cli->cl_loi_list_lock);
2473 data->ocd_grant = (cli->cl_avail_grant +
2474 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2475 2 * cli_brw_size(obd);
2476 lost_grant = cli->cl_lost_grant;
2477 cli->cl_lost_grant = 0;
2478 spin_unlock(&cli->cl_loi_list_lock);
2480 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2481 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2482 data->ocd_version, data->ocd_grant, lost_grant);
2488 static int osc_disconnect(struct obd_export *exp)
2490 struct obd_device *obd = class_exp2obd(exp);
2493 rc = client_disconnect_export(exp);
2495 * Initially we put del_shrink_grant before disconnect_export, but it
2496 * causes the following problem if setup (connect) and cleanup
2497 * (disconnect) are tangled together.
2498 * connect p1 disconnect p2
2499 * ptlrpc_connect_import
2500 * ............... class_manual_cleanup
2503 * ptlrpc_connect_interrupt
2505 * add this client to shrink list
2507 * Bang! pinger trigger the shrink.
2508 * So the osc should be disconnected from the shrink list, after we
2509 * are sure the import has been destroyed. BUG18662
2511 if (obd->u.cli.cl_import == NULL)
2512 osc_del_shrink_grant(&obd->u.cli);
2516 static int osc_import_event(struct obd_device *obd,
2517 struct obd_import *imp,
2518 enum obd_import_event event)
2520 struct client_obd *cli;
2524 LASSERT(imp->imp_obd == obd);
2527 case IMP_EVENT_DISCON: {
2529 spin_lock(&cli->cl_loi_list_lock);
2530 cli->cl_avail_grant = 0;
2531 cli->cl_lost_grant = 0;
2532 spin_unlock(&cli->cl_loi_list_lock);
2535 case IMP_EVENT_INACTIVE: {
2536 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2539 case IMP_EVENT_INVALIDATE: {
2540 struct ldlm_namespace *ns = obd->obd_namespace;
2544 env = cl_env_get(&refcheck);
2548 /* all pages go to failing rpcs due to the invalid
2550 osc_io_unplug(env, cli, NULL);
2552 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2553 cl_env_put(env, &refcheck);
2558 case IMP_EVENT_ACTIVE: {
2559 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2562 case IMP_EVENT_OCD: {
2563 struct obd_connect_data *ocd = &imp->imp_connect_data;
2565 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2566 osc_init_grant(&obd->u.cli, ocd);
2569 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2570 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2572 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2575 case IMP_EVENT_DEACTIVATE: {
2576 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2579 case IMP_EVENT_ACTIVATE: {
2580 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2584 CERROR("Unknown import event %d\n", event);
2591 * Determine whether the lock can be canceled before replaying the lock
2592 * during recovery, see bug16774 for detailed information.
2594 * \retval zero the lock can't be canceled
2595 * \retval other ok to cancel
2597 static int osc_cancel_weight(struct ldlm_lock *lock)
2600 * Cancel all unused and granted extent lock.
2602 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2603 lock->l_granted_mode == lock->l_req_mode &&
2604 osc_ldlm_weigh_ast(lock) == 0)
2610 static int brw_queue_work(const struct lu_env *env, void *data)
2612 struct client_obd *cli = data;
2614 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2616 osc_io_unplug(env, cli, NULL);
2620 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2622 struct client_obd *cli = &obd->u.cli;
2623 struct obd_type *type;
2631 rc = ptlrpcd_addref();
2635 rc = client_obd_setup(obd, lcfg);
2637 GOTO(out_ptlrpcd, rc);
2639 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2640 if (IS_ERR(handler))
2641 GOTO(out_client_setup, rc = PTR_ERR(handler));
2642 cli->cl_writeback_work = handler;
2644 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2645 if (IS_ERR(handler))
2646 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2647 cli->cl_lru_work = handler;
2649 rc = osc_quota_setup(obd);
2651 GOTO(out_ptlrpcd_work, rc);
2653 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2655 #ifdef CONFIG_PROC_FS
2656 obd->obd_vars = lprocfs_osc_obd_vars;
2658 /* If this is true then both client (osc) and server (osp) are on the
2659 * same node. The osp layer if loaded first will register the osc proc
2660 * directory. In that case this obd_device will be attached its proc
2661 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2662 type = class_search_type(LUSTRE_OSP_NAME);
2663 if (type && type->typ_procsym) {
2664 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2666 obd->obd_vars, obd);
2667 if (IS_ERR(obd->obd_proc_entry)) {
2668 rc = PTR_ERR(obd->obd_proc_entry);
2669 CERROR("error %d setting up lprocfs for %s\n", rc,
2671 obd->obd_proc_entry = NULL;
2674 rc = lprocfs_obd_setup(obd);
2677 /* If the basic OSC proc tree construction succeeded then
2678 * lets do the rest. */
2680 lproc_osc_attach_seqstat(obd);
2681 sptlrpc_lprocfs_cliobd_attach(obd);
2682 ptlrpc_lprocfs_register_obd(obd);
2686 * We try to control the total number of requests with a upper limit
2687 * osc_reqpool_maxreqcount. There might be some race which will cause
2688 * over-limit allocation, but it is fine.
2690 req_count = atomic_read(&osc_pool_req_count);
2691 if (req_count < osc_reqpool_maxreqcount) {
2692 adding = cli->cl_max_rpcs_in_flight + 2;
2693 if (req_count + adding > osc_reqpool_maxreqcount)
2694 adding = osc_reqpool_maxreqcount - req_count;
2696 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2697 atomic_add(added, &osc_pool_req_count);
2700 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2701 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2705 if (cli->cl_writeback_work != NULL) {
2706 ptlrpcd_destroy_work(cli->cl_writeback_work);
2707 cli->cl_writeback_work = NULL;
2709 if (cli->cl_lru_work != NULL) {
2710 ptlrpcd_destroy_work(cli->cl_lru_work);
2711 cli->cl_lru_work = NULL;
2714 client_obd_cleanup(obd);
2720 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2726 case OBD_CLEANUP_EARLY: {
2727 struct obd_import *imp;
2728 imp = obd->u.cli.cl_import;
2729 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2730 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2731 ptlrpc_deactivate_import(imp);
2732 spin_lock(&imp->imp_lock);
2733 imp->imp_pingable = 0;
2734 spin_unlock(&imp->imp_lock);
2737 case OBD_CLEANUP_EXPORTS: {
2738 struct client_obd *cli = &obd->u.cli;
2740 * for echo client, export may be on zombie list, wait for
2741 * zombie thread to cull it, because cli.cl_import will be
2742 * cleared in client_disconnect_export():
2743 * class_export_destroy() -> obd_cleanup() ->
2744 * echo_device_free() -> echo_client_cleanup() ->
2745 * obd_disconnect() -> osc_disconnect() ->
2746 * client_disconnect_export()
2748 obd_zombie_barrier();
2749 if (cli->cl_writeback_work) {
2750 ptlrpcd_destroy_work(cli->cl_writeback_work);
2751 cli->cl_writeback_work = NULL;
2753 if (cli->cl_lru_work) {
2754 ptlrpcd_destroy_work(cli->cl_lru_work);
2755 cli->cl_lru_work = NULL;
2757 obd_cleanup_client_import(obd);
2758 ptlrpc_lprocfs_unregister_obd(obd);
2759 lprocfs_obd_cleanup(obd);
2766 int osc_cleanup(struct obd_device *obd)
2768 struct client_obd *cli = &obd->u.cli;
2774 if (cli->cl_cache != NULL) {
2775 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2776 spin_lock(&cli->cl_cache->ccc_lru_lock);
2777 list_del_init(&cli->cl_lru_osc);
2778 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2779 cli->cl_lru_left = NULL;
2780 cl_cache_decref(cli->cl_cache);
2781 cli->cl_cache = NULL;
2784 /* free memory of osc quota cache */
2785 osc_quota_cleanup(obd);
2787 rc = client_obd_cleanup(obd);
2793 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2795 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2796 return rc > 0 ? 0: rc;
2799 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2801 return osc_process_config_base(obd, buf);
2804 static struct obd_ops osc_obd_ops = {
2805 .o_owner = THIS_MODULE,
2806 .o_setup = osc_setup,
2807 .o_precleanup = osc_precleanup,
2808 .o_cleanup = osc_cleanup,
2809 .o_add_conn = client_import_add_conn,
2810 .o_del_conn = client_import_del_conn,
2811 .o_connect = client_connect_import,
2812 .o_reconnect = osc_reconnect,
2813 .o_disconnect = osc_disconnect,
2814 .o_statfs = osc_statfs,
2815 .o_statfs_async = osc_statfs_async,
2816 .o_create = osc_create,
2817 .o_destroy = osc_destroy,
2818 .o_getattr = osc_getattr,
2819 .o_setattr = osc_setattr,
2820 .o_iocontrol = osc_iocontrol,
2821 .o_set_info_async = osc_set_info_async,
2822 .o_import_event = osc_import_event,
2823 .o_process_config = osc_process_config,
2824 .o_quotactl = osc_quotactl,
2827 static int __init osc_init(void)
2829 bool enable_proc = true;
2830 struct obd_type *type;
2831 unsigned int reqpool_size;
2832 unsigned int reqsize;
2837 /* print an address of _any_ initialized kernel symbol from this
2838 * module, to allow debugging with gdb that doesn't support data
2839 * symbols from modules.*/
2840 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2842 rc = lu_kmem_init(osc_caches);
2846 type = class_search_type(LUSTRE_OSP_NAME);
2847 if (type != NULL && type->typ_procsym != NULL)
2848 enable_proc = false;
2850 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2851 LUSTRE_OSC_NAME, &osc_device_type);
2855 /* This is obviously too much memory, only prevent overflow here */
2856 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2857 GOTO(out_type, rc = -EINVAL);
2859 reqpool_size = osc_reqpool_mem_max << 20;
2862 while (reqsize < OST_IO_MAXREQSIZE)
2863 reqsize = reqsize << 1;
2866 * We don't enlarge the request count in OSC pool according to
2867 * cl_max_rpcs_in_flight. The allocation from the pool will only be
2868 * tried after normal allocation failed. So a small OSC pool won't
2869 * cause much performance degression in most of cases.
2871 osc_reqpool_maxreqcount = reqpool_size / reqsize;
2873 atomic_set(&osc_pool_req_count, 0);
2874 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
2875 ptlrpc_add_rqs_to_pool);
2877 if (osc_rq_pool != NULL)
2881 class_unregister_type(LUSTRE_OSC_NAME);
2883 lu_kmem_fini(osc_caches);
2888 static void /*__exit*/ osc_exit(void)
2890 class_unregister_type(LUSTRE_OSC_NAME);
2891 lu_kmem_fini(osc_caches);
2892 ptlrpc_free_rq_pool(osc_rq_pool);
2895 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2896 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2897 MODULE_VERSION(LUSTRE_VERSION_STRING);
2898 MODULE_LICENSE("GPL");
2900 module_init(osc_init);
2901 module_exit(osc_exit);