4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
55 struct osc_brw_async_args {
61 struct brw_page **aa_ppga;
62 struct client_obd *aa_cli;
63 struct list_head aa_oaps;
64 struct list_head aa_exts;
65 struct obd_capa *aa_ocapa;
66 struct cl_req *aa_clerq;
69 #define osc_grant_args osc_brw_async_args
71 struct osc_setattr_args {
73 obd_enqueue_update_f sa_upcall;
77 struct osc_fsync_args {
78 struct obd_info *fa_oi;
79 obd_enqueue_update_f fa_upcall;
83 struct osc_enqueue_args {
84 struct obd_export *oa_exp;
88 osc_enqueue_upcall_f oa_upcall;
90 struct ost_lvb *oa_lvb;
91 struct lustre_handle oa_lockh;
92 unsigned int oa_agl:1;
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100 struct ost_body *body, void *capa)
102 struct obd_capa *oc = (struct obd_capa *)capa;
103 struct lustre_capa *c;
108 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
111 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112 DEBUG_CAPA(D_SEC, c, "pack");
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
117 struct ost_body *body;
119 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
122 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
124 osc_pack_capa(req, body, oinfo->oi_capa);
127 void osc_set_capa_size(struct ptlrpc_request *req,
128 const struct req_msg_field *field,
132 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
134 /* it is already calculated as sizeof struct obd_capa */
138 int osc_getattr_interpret(const struct lu_env *env,
139 struct ptlrpc_request *req,
140 struct osc_async_args *aa, int rc)
142 struct ost_body *body;
148 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
150 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
152 aa->aa_oi->oi_oa, &body->oa);
154 /* This should really be sent by the OST */
155 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
156 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
158 CDEBUG(D_INFO, "can't unpack ost_body\n");
160 aa->aa_oi->oi_oa->o_valid = 0;
163 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
167 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
168 struct obd_info *oinfo)
170 struct ptlrpc_request *req;
171 struct ost_body *body;
175 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
179 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
180 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
182 ptlrpc_request_free(req);
186 osc_pack_req_body(req, oinfo);
188 ptlrpc_request_set_replen(req);
190 rc = ptlrpc_queue_wait(req);
194 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196 GOTO(out, rc = -EPROTO);
198 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
202 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
203 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
207 ptlrpc_req_finished(req);
211 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
212 struct obd_info *oinfo)
214 struct ptlrpc_request *req;
215 struct ost_body *body;
219 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
221 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
225 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
226 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
228 ptlrpc_request_free(req);
232 osc_pack_req_body(req, oinfo);
234 ptlrpc_request_set_replen(req);
236 rc = ptlrpc_queue_wait(req);
240 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
242 GOTO(out, rc = -EPROTO);
244 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
249 ptlrpc_req_finished(req);
253 static int osc_setattr_interpret(const struct lu_env *env,
254 struct ptlrpc_request *req,
255 struct osc_setattr_args *sa, int rc)
257 struct ost_body *body;
263 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
265 GOTO(out, rc = -EPROTO);
267 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
270 rc = sa->sa_upcall(sa->sa_cookie, rc);
274 int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
275 obd_enqueue_update_f upcall, void *cookie,
276 struct ptlrpc_request_set *rqset)
278 struct ptlrpc_request *req;
279 struct osc_setattr_args *sa;
283 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
287 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
288 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
290 ptlrpc_request_free(req);
294 osc_pack_req_body(req, oinfo);
296 ptlrpc_request_set_replen(req);
298 /* do mds to ost setattr asynchronously */
300 /* Do not wait for response. */
301 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
303 req->rq_interpret_reply =
304 (ptlrpc_interpterer_t)osc_setattr_interpret;
306 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
307 sa = ptlrpc_req_async_args(req);
308 sa->sa_oa = oinfo->oi_oa;
309 sa->sa_upcall = upcall;
310 sa->sa_cookie = cookie;
312 if (rqset == PTLRPCD_SET)
313 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
315 ptlrpc_set_add_req(rqset, req);
321 static int osc_create(const struct lu_env *env, struct obd_export *exp,
324 struct ptlrpc_request *req;
325 struct ost_body *body;
330 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
331 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
333 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
335 GOTO(out, rc = -ENOMEM);
337 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
339 ptlrpc_request_free(req);
343 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
346 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
348 ptlrpc_request_set_replen(req);
350 rc = ptlrpc_queue_wait(req);
354 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
356 GOTO(out_req, rc = -EPROTO);
358 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
359 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
361 oa->o_blksize = cli_brw_size(exp->exp_obd);
362 oa->o_valid |= OBD_MD_FLBLKSZ;
364 CDEBUG(D_HA, "transno: "LPD64"\n",
365 lustre_msg_get_transno(req->rq_repmsg));
367 ptlrpc_req_finished(req);
372 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
373 obd_enqueue_update_f upcall, void *cookie,
374 struct ptlrpc_request_set *rqset)
376 struct ptlrpc_request *req;
377 struct osc_setattr_args *sa;
378 struct ost_body *body;
382 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
386 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
387 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
389 ptlrpc_request_free(req);
392 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
393 ptlrpc_at_set_req_timeout(req);
395 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
397 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
399 osc_pack_capa(req, body, oinfo->oi_capa);
401 ptlrpc_request_set_replen(req);
403 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
404 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405 sa = ptlrpc_req_async_args(req);
406 sa->sa_oa = oinfo->oi_oa;
407 sa->sa_upcall = upcall;
408 sa->sa_cookie = cookie;
409 if (rqset == PTLRPCD_SET)
410 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412 ptlrpc_set_add_req(rqset, req);
417 static int osc_sync_interpret(const struct lu_env *env,
418 struct ptlrpc_request *req,
421 struct osc_fsync_args *fa = arg;
422 struct ost_body *body;
428 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
430 CERROR ("can't unpack ost_body\n");
431 GOTO(out, rc = -EPROTO);
434 *fa->fa_oi->oi_oa = body->oa;
436 rc = fa->fa_upcall(fa->fa_cookie, rc);
440 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
441 obd_enqueue_update_f upcall, void *cookie,
442 struct ptlrpc_request_set *rqset)
444 struct ptlrpc_request *req;
445 struct ost_body *body;
446 struct osc_fsync_args *fa;
450 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
454 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
455 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
457 ptlrpc_request_free(req);
461 /* overload the size and blocks fields in the oa with start/end */
462 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
464 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
466 osc_pack_capa(req, body, oinfo->oi_capa);
468 ptlrpc_request_set_replen(req);
469 req->rq_interpret_reply = osc_sync_interpret;
471 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
472 fa = ptlrpc_req_async_args(req);
474 fa->fa_upcall = upcall;
475 fa->fa_cookie = cookie;
477 if (rqset == PTLRPCD_SET)
478 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
480 ptlrpc_set_add_req(rqset, req);
485 /* Find and cancel locally locks matched by @mode in the resource found by
486 * @objid. Found locks are added into @cancel list. Returns the amount of
487 * locks added to @cancels list. */
488 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
489 struct list_head *cancels,
490 ldlm_mode_t mode, __u64 lock_flags)
492 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
493 struct ldlm_res_id res_id;
494 struct ldlm_resource *res;
498 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
499 * export) but disabled through procfs (flag in NS).
501 * This distinguishes from a case when ELC is not supported originally,
502 * when we still want to cancel locks in advance and just cancel them
503 * locally, without sending any RPC. */
504 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
507 ostid_build_res_name(&oa->o_oi, &res_id);
508 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
512 LDLM_RESOURCE_ADDREF(res);
513 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
514 lock_flags, 0, NULL);
515 LDLM_RESOURCE_DELREF(res);
516 ldlm_resource_putref(res);
520 static int osc_destroy_interpret(const struct lu_env *env,
521 struct ptlrpc_request *req, void *data,
524 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
526 atomic_dec(&cli->cl_destroy_in_flight);
527 wake_up(&cli->cl_destroy_waitq);
531 static int osc_can_send_destroy(struct client_obd *cli)
533 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
534 cli->cl_max_rpcs_in_flight) {
535 /* The destroy request can be sent */
538 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
539 cli->cl_max_rpcs_in_flight) {
541 * The counter has been modified between the two atomic
544 wake_up(&cli->cl_destroy_waitq);
549 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
552 struct client_obd *cli = &exp->exp_obd->u.cli;
553 struct ptlrpc_request *req;
554 struct ost_body *body;
555 struct list_head cancels = LIST_HEAD_INIT(cancels);
560 CDEBUG(D_INFO, "oa NULL\n");
564 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
565 LDLM_FL_DISCARD_DATA);
567 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
569 ldlm_lock_list_put(&cancels, l_bl_ast, count);
573 osc_set_capa_size(req, &RMF_CAPA1, NULL);
574 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
577 ptlrpc_request_free(req);
581 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
582 ptlrpc_at_set_req_timeout(req);
584 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
586 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
588 ptlrpc_request_set_replen(req);
590 req->rq_interpret_reply = osc_destroy_interpret;
591 if (!osc_can_send_destroy(cli)) {
592 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
595 * Wait until the number of on-going destroy RPCs drops
596 * under max_rpc_in_flight
598 l_wait_event_exclusive(cli->cl_destroy_waitq,
599 osc_can_send_destroy(cli), &lwi);
602 /* Do not wait for response */
603 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
607 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
610 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
612 LASSERT(!(oa->o_valid & bits));
615 spin_lock(&cli->cl_loi_list_lock);
616 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
617 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
618 cli->cl_dirty_max_pages)) {
619 CERROR("dirty %lu - %lu > dirty_max %lu\n",
620 cli->cl_dirty_pages, cli->cl_dirty_transit,
621 cli->cl_dirty_max_pages);
623 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
624 atomic_long_read(&obd_dirty_transit_pages) >
625 (obd_max_dirty_pages + 1))) {
626 /* The atomic_read() allowing the atomic_inc() are
627 * not covered by a lock thus they may safely race and trip
628 * this CERROR() unless we add in a small fudge factor (+1). */
629 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
630 cli->cl_import->imp_obd->obd_name,
631 atomic_long_read(&obd_dirty_pages),
632 atomic_long_read(&obd_dirty_transit_pages),
633 obd_max_dirty_pages);
635 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
637 CERROR("dirty %lu - dirty_max %lu too big???\n",
638 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
641 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
643 (cli->cl_max_rpcs_in_flight + 1);
644 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
647 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
648 oa->o_dropped = cli->cl_lost_grant;
649 cli->cl_lost_grant = 0;
650 spin_unlock(&cli->cl_loi_list_lock);
651 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
652 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
656 void osc_update_next_shrink(struct client_obd *cli)
658 cli->cl_next_shrink_grant =
659 cfs_time_shift(cli->cl_grant_shrink_interval);
660 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
661 cli->cl_next_shrink_grant);
664 static void __osc_update_grant(struct client_obd *cli, u64 grant)
666 spin_lock(&cli->cl_loi_list_lock);
667 cli->cl_avail_grant += grant;
668 spin_unlock(&cli->cl_loi_list_lock);
671 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
673 if (body->oa.o_valid & OBD_MD_FLGRANT) {
674 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
675 __osc_update_grant(cli, body->oa.o_grant);
679 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
680 u32 keylen, void *key,
681 u32 vallen, void *val,
682 struct ptlrpc_request_set *set);
684 static int osc_shrink_grant_interpret(const struct lu_env *env,
685 struct ptlrpc_request *req,
688 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
689 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
690 struct ost_body *body;
693 __osc_update_grant(cli, oa->o_grant);
697 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
699 osc_update_grant(cli, body);
705 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
707 spin_lock(&cli->cl_loi_list_lock);
708 oa->o_grant = cli->cl_avail_grant / 4;
709 cli->cl_avail_grant -= oa->o_grant;
710 spin_unlock(&cli->cl_loi_list_lock);
711 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
712 oa->o_valid |= OBD_MD_FLFLAGS;
715 oa->o_flags |= OBD_FL_SHRINK_GRANT;
716 osc_update_next_shrink(cli);
719 /* Shrink the current grant, either from some large amount to enough for a
720 * full set of in-flight RPCs, or if we have already shrunk to that limit
721 * then to enough for a single RPC. This avoids keeping more grant than
722 * needed, and avoids shrinking the grant piecemeal. */
723 static int osc_shrink_grant(struct client_obd *cli)
725 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
726 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
728 spin_lock(&cli->cl_loi_list_lock);
729 if (cli->cl_avail_grant <= target_bytes)
730 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
731 spin_unlock(&cli->cl_loi_list_lock);
733 return osc_shrink_grant_to_target(cli, target_bytes);
736 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
739 struct ost_body *body;
742 spin_lock(&cli->cl_loi_list_lock);
743 /* Don't shrink if we are already above or below the desired limit
744 * We don't want to shrink below a single RPC, as that will negatively
745 * impact block allocation and long-term performance. */
746 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
747 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
749 if (target_bytes >= cli->cl_avail_grant) {
750 spin_unlock(&cli->cl_loi_list_lock);
753 spin_unlock(&cli->cl_loi_list_lock);
759 osc_announce_cached(cli, &body->oa, 0);
761 spin_lock(&cli->cl_loi_list_lock);
762 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
763 cli->cl_avail_grant = target_bytes;
764 spin_unlock(&cli->cl_loi_list_lock);
765 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
766 body->oa.o_valid |= OBD_MD_FLFLAGS;
767 body->oa.o_flags = 0;
769 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
770 osc_update_next_shrink(cli);
772 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
773 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
774 sizeof(*body), body, NULL);
776 __osc_update_grant(cli, body->oa.o_grant);
781 static int osc_should_shrink_grant(struct client_obd *client)
783 cfs_time_t time = cfs_time_current();
784 cfs_time_t next_shrink = client->cl_next_shrink_grant;
786 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
787 OBD_CONNECT_GRANT_SHRINK) == 0)
790 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
791 /* Get the current RPC size directly, instead of going via:
792 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
793 * Keep comment here so that it can be found by searching. */
794 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
796 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
797 client->cl_avail_grant > brw_size)
800 osc_update_next_shrink(client);
805 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
807 struct client_obd *client;
809 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
810 if (osc_should_shrink_grant(client))
811 osc_shrink_grant(client);
816 static int osc_add_shrink_grant(struct client_obd *client)
820 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
822 osc_grant_shrink_grant_cb, NULL,
823 &client->cl_grant_shrink_list);
825 CERROR("add grant client %s error %d\n",
826 client->cl_import->imp_obd->obd_name, rc);
829 CDEBUG(D_CACHE, "add grant client %s \n",
830 client->cl_import->imp_obd->obd_name);
831 osc_update_next_shrink(client);
835 static int osc_del_shrink_grant(struct client_obd *client)
837 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
841 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
844 * ocd_grant is the total grant amount we're expect to hold: if we've
845 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
846 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
849 * race is tolerable here: if we're evicted, but imp_state already
850 * left EVICTED state, then cl_dirty_pages must be 0 already.
852 spin_lock(&cli->cl_loi_list_lock);
853 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
854 cli->cl_avail_grant = ocd->ocd_grant;
856 cli->cl_avail_grant = ocd->ocd_grant -
857 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
859 if (cli->cl_avail_grant < 0) {
860 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
861 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
862 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
863 /* workaround for servers which do not have the patch from
865 cli->cl_avail_grant = ocd->ocd_grant;
868 /* determine the appropriate chunk size used by osc_extent. */
869 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
870 spin_unlock(&cli->cl_loi_list_lock);
872 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
873 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
874 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
876 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
877 list_empty(&cli->cl_grant_shrink_list))
878 osc_add_shrink_grant(cli);
881 /* We assume that the reason this OSC got a short read is because it read
882 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
883 * via the LOV, and it _knows_ it's reading inside the file, it's just that
884 * this stripe never got written at or beyond this stripe offset yet. */
885 static void handle_short_read(int nob_read, size_t page_count,
886 struct brw_page **pga)
891 /* skip bytes read OK */
892 while (nob_read > 0) {
893 LASSERT (page_count > 0);
895 if (pga[i]->count > nob_read) {
896 /* EOF inside this page */
897 ptr = kmap(pga[i]->pg) +
898 (pga[i]->off & ~PAGE_MASK);
899 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
906 nob_read -= pga[i]->count;
911 /* zero remaining pages */
912 while (page_count-- > 0) {
913 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
914 memset(ptr, 0, pga[i]->count);
920 static int check_write_rcs(struct ptlrpc_request *req,
921 int requested_nob, int niocount,
922 size_t page_count, struct brw_page **pga)
927 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
928 sizeof(*remote_rcs) *
930 if (remote_rcs == NULL) {
931 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
935 /* return error if any niobuf was in error */
936 for (i = 0; i < niocount; i++) {
937 if ((int)remote_rcs[i] < 0)
938 return(remote_rcs[i]);
940 if (remote_rcs[i] != 0) {
941 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
942 i, remote_rcs[i], req);
947 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
948 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
949 req->rq_bulk->bd_nob_transferred, requested_nob);
956 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
958 if (p1->flag != p2->flag) {
959 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
960 OBD_BRW_SYNC | OBD_BRW_ASYNC |
961 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
963 /* warn if we try to combine flags that we don't know to be
965 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
966 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
967 "report this at https://jira.hpdd.intel.com/\n",
973 return (p1->off + p1->count == p2->off);
976 static u32 osc_checksum_bulk(int nob, size_t pg_count,
977 struct brw_page **pga, int opc,
978 cksum_type_t cksum_type)
982 struct cfs_crypto_hash_desc *hdesc;
983 unsigned int bufsize;
985 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
987 LASSERT(pg_count > 0);
989 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
991 CERROR("Unable to initialize checksum hash %s\n",
992 cfs_crypto_hash_name(cfs_alg));
993 return PTR_ERR(hdesc);
996 while (nob > 0 && pg_count > 0) {
997 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
999 /* corrupt the data before we compute the checksum, to
1000 * simulate an OST->client data error */
1001 if (i == 0 && opc == OST_READ &&
1002 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1003 unsigned char *ptr = kmap(pga[i]->pg);
1004 int off = pga[i]->off & ~PAGE_MASK;
1006 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1009 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1010 pga[i]->off & ~PAGE_MASK,
1012 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1013 (int)(pga[i]->off & ~PAGE_MASK));
1015 nob -= pga[i]->count;
1020 bufsize = sizeof(cksum);
1021 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1023 /* For sending we only compute the wrong checksum instead
1024 * of corrupting the data so it is still correct on a redo */
1025 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1032 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1033 u32 page_count, struct brw_page **pga,
1034 struct ptlrpc_request **reqp, struct obd_capa *ocapa,
1035 int reserve, int resend)
1037 struct ptlrpc_request *req;
1038 struct ptlrpc_bulk_desc *desc;
1039 struct ost_body *body;
1040 struct obd_ioobj *ioobj;
1041 struct niobuf_remote *niobuf;
1042 int niocount, i, requested_nob, opc, rc;
1043 struct osc_brw_async_args *aa;
1044 struct req_capsule *pill;
1045 struct brw_page *pg_prev;
1048 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1049 RETURN(-ENOMEM); /* Recoverable */
1050 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1051 RETURN(-EINVAL); /* Fatal */
1053 if ((cmd & OBD_BRW_WRITE) != 0) {
1055 req = ptlrpc_request_alloc_pool(cli->cl_import,
1056 cli->cl_import->imp_rq_pool,
1057 &RQF_OST_BRW_WRITE);
1060 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1065 for (niocount = i = 1; i < page_count; i++) {
1066 if (!can_merge_pages(pga[i - 1], pga[i]))
1070 pill = &req->rq_pill;
1071 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1073 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1074 niocount * sizeof(*niobuf));
1075 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1077 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1079 ptlrpc_request_free(req);
1082 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1083 ptlrpc_at_set_req_timeout(req);
1084 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1086 req->rq_no_retry_einprogress = 1;
1088 desc = ptlrpc_prep_bulk_imp(req, page_count,
1089 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1090 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1094 GOTO(out, rc = -ENOMEM);
1095 /* NB request now owns desc and will free it when it gets freed */
1097 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1098 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1099 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1100 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1102 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1104 obdo_to_ioobj(oa, ioobj);
1105 ioobj->ioo_bufcnt = niocount;
1106 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1107 * that might be send for this request. The actual number is decided
1108 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1109 * "max - 1" for old client compatibility sending "0", and also so the
1110 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1111 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1112 osc_pack_capa(req, body, ocapa);
1113 LASSERT(page_count > 0);
1115 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1116 struct brw_page *pg = pga[i];
1117 int poff = pg->off & ~PAGE_MASK;
1119 LASSERT(pg->count > 0);
1120 /* make sure there is no gap in the middle of page array */
1121 LASSERTF(page_count == 1 ||
1122 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1123 ergo(i > 0 && i < page_count - 1,
1124 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1125 ergo(i == page_count - 1, poff == 0)),
1126 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1127 i, page_count, pg, pg->off, pg->count);
1128 LASSERTF(i == 0 || pg->off > pg_prev->off,
1129 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1130 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1132 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1133 pg_prev->pg, page_private(pg_prev->pg),
1134 pg_prev->pg->index, pg_prev->off);
1135 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1136 (pg->flag & OBD_BRW_SRVLOCK));
1138 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1139 requested_nob += pg->count;
1141 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1143 niobuf->rnb_len += pg->count;
1145 niobuf->rnb_offset = pg->off;
1146 niobuf->rnb_len = pg->count;
1147 niobuf->rnb_flags = pg->flag;
1152 LASSERTF((void *)(niobuf - niocount) ==
1153 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1154 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1155 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1157 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1159 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1160 body->oa.o_valid |= OBD_MD_FLFLAGS;
1161 body->oa.o_flags = 0;
1163 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1166 if (osc_should_shrink_grant(cli))
1167 osc_shrink_grant_local(cli, &body->oa);
1169 /* size[REQ_REC_OFF] still sizeof (*body) */
1170 if (opc == OST_WRITE) {
1171 if (cli->cl_checksum &&
1172 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1173 /* store cl_cksum_type in a local variable since
1174 * it can be changed via lprocfs */
1175 cksum_type_t cksum_type = cli->cl_cksum_type;
1177 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1178 oa->o_flags &= OBD_FL_LOCAL_MASK;
1179 body->oa.o_flags = 0;
1181 body->oa.o_flags |= cksum_type_pack(cksum_type);
1182 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1183 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1187 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1189 /* save this in 'oa', too, for later checking */
1190 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1191 oa->o_flags |= cksum_type_pack(cksum_type);
1193 /* clear out the checksum flag, in case this is a
1194 * resend but cl_checksum is no longer set. b=11238 */
1195 oa->o_valid &= ~OBD_MD_FLCKSUM;
1197 oa->o_cksum = body->oa.o_cksum;
1198 /* 1 RC per niobuf */
1199 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1200 sizeof(__u32) * niocount);
1202 if (cli->cl_checksum &&
1203 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1204 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1205 body->oa.o_flags = 0;
1206 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1207 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1210 ptlrpc_request_set_replen(req);
1212 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1213 aa = ptlrpc_req_async_args(req);
1215 aa->aa_requested_nob = requested_nob;
1216 aa->aa_nio_count = niocount;
1217 aa->aa_page_count = page_count;
1221 INIT_LIST_HEAD(&aa->aa_oaps);
1222 if (ocapa && reserve)
1223 aa->aa_ocapa = capa_get(ocapa);
1226 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1227 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1228 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1229 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1233 ptlrpc_req_finished(req);
1237 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1238 __u32 client_cksum, __u32 server_cksum, int nob,
1239 size_t page_count, struct brw_page **pga,
1240 cksum_type_t client_cksum_type)
1244 cksum_type_t cksum_type;
1246 if (server_cksum == client_cksum) {
1247 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1251 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1253 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1256 if (cksum_type != client_cksum_type)
1257 msg = "the server did not use the checksum type specified in "
1258 "the original request - likely a protocol problem";
1259 else if (new_cksum == server_cksum)
1260 msg = "changed on the client after we checksummed it - "
1261 "likely false positive due to mmap IO (bug 11742)";
1262 else if (new_cksum == client_cksum)
1263 msg = "changed in transit before arrival at OST";
1265 msg = "changed in transit AND doesn't match the original - "
1266 "likely false positive due to mmap IO (bug 11742)";
1268 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1269 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1270 msg, libcfs_nid2str(peer->nid),
1271 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1272 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1273 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1274 POSTID(&oa->o_oi), pga[0]->off,
1275 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1276 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1277 "client csum now %x\n", client_cksum, client_cksum_type,
1278 server_cksum, cksum_type, new_cksum);
1282 /* Note rc enters this function as number of bytes transferred */
1283 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1285 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1286 const lnet_process_id_t *peer =
1287 &req->rq_import->imp_connection->c_peer;
1288 struct client_obd *cli = aa->aa_cli;
1289 struct ost_body *body;
1290 u32 client_cksum = 0;
1293 if (rc < 0 && rc != -EDQUOT) {
1294 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1298 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1299 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1301 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1305 /* set/clear over quota flag for a uid/gid */
1306 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1307 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1308 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1310 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1311 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1313 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1316 osc_update_grant(cli, body);
1321 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1322 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1324 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1326 CERROR("Unexpected +ve rc %d\n", rc);
1329 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1331 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1334 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1335 check_write_checksum(&body->oa, peer, client_cksum,
1336 body->oa.o_cksum, aa->aa_requested_nob,
1337 aa->aa_page_count, aa->aa_ppga,
1338 cksum_type_unpack(aa->aa_oa->o_flags)))
1341 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1342 aa->aa_page_count, aa->aa_ppga);
1346 /* The rest of this function executes only for OST_READs */
1348 /* if unwrap_bulk failed, return -EAGAIN to retry */
1349 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1351 GOTO(out, rc = -EAGAIN);
1353 if (rc > aa->aa_requested_nob) {
1354 CERROR("Unexpected rc %d (%d requested)\n", rc,
1355 aa->aa_requested_nob);
1359 if (rc != req->rq_bulk->bd_nob_transferred) {
1360 CERROR ("Unexpected rc %d (%d transferred)\n",
1361 rc, req->rq_bulk->bd_nob_transferred);
1365 if (rc < aa->aa_requested_nob)
1366 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1368 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1369 static int cksum_counter;
1370 u32 server_cksum = body->oa.o_cksum;
1373 cksum_type_t cksum_type;
1375 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1376 body->oa.o_flags : 0);
1377 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1378 aa->aa_ppga, OST_READ,
1381 if (peer->nid != req->rq_bulk->bd_sender) {
1383 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1386 if (server_cksum != client_cksum) {
1387 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1388 "%s%s%s inode "DFID" object "DOSTID
1389 " extent ["LPU64"-"LPU64"]\n",
1390 req->rq_import->imp_obd->obd_name,
1391 libcfs_nid2str(peer->nid),
1393 body->oa.o_valid & OBD_MD_FLFID ?
1394 body->oa.o_parent_seq : (__u64)0,
1395 body->oa.o_valid & OBD_MD_FLFID ?
1396 body->oa.o_parent_oid : 0,
1397 body->oa.o_valid & OBD_MD_FLFID ?
1398 body->oa.o_parent_ver : 0,
1399 POSTID(&body->oa.o_oi),
1400 aa->aa_ppga[0]->off,
1401 aa->aa_ppga[aa->aa_page_count-1]->off +
1402 aa->aa_ppga[aa->aa_page_count-1]->count -
1404 CERROR("client %x, server %x, cksum_type %x\n",
1405 client_cksum, server_cksum, cksum_type);
1407 aa->aa_oa->o_cksum = client_cksum;
1411 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1414 } else if (unlikely(client_cksum)) {
1415 static int cksum_missed;
1418 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1419 CERROR("Checksum %u requested from %s but not sent\n",
1420 cksum_missed, libcfs_nid2str(peer->nid));
1426 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1427 aa->aa_oa, &body->oa);
1432 static int osc_brw_redo_request(struct ptlrpc_request *request,
1433 struct osc_brw_async_args *aa, int rc)
1435 struct ptlrpc_request *new_req;
1436 struct osc_brw_async_args *new_aa;
1437 struct osc_async_page *oap;
1440 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1441 "redo for recoverable error %d", rc);
1443 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1444 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1445 aa->aa_cli, aa->aa_oa,
1446 aa->aa_page_count, aa->aa_ppga,
1447 &new_req, aa->aa_ocapa, 0, 1);
1451 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1452 if (oap->oap_request != NULL) {
1453 LASSERTF(request == oap->oap_request,
1454 "request %p != oap_request %p\n",
1455 request, oap->oap_request);
1456 if (oap->oap_interrupted) {
1457 ptlrpc_req_finished(new_req);
1462 /* New request takes over pga and oaps from old request.
1463 * Note that copying a list_head doesn't work, need to move it... */
1465 new_req->rq_interpret_reply = request->rq_interpret_reply;
1466 new_req->rq_async_args = request->rq_async_args;
1467 new_req->rq_commit_cb = request->rq_commit_cb;
1468 /* cap resend delay to the current request timeout, this is similar to
1469 * what ptlrpc does (see after_reply()) */
1470 if (aa->aa_resends > new_req->rq_timeout)
1471 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1473 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1474 new_req->rq_generation_set = 1;
1475 new_req->rq_import_generation = request->rq_import_generation;
1477 new_aa = ptlrpc_req_async_args(new_req);
1479 INIT_LIST_HEAD(&new_aa->aa_oaps);
1480 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1481 INIT_LIST_HEAD(&new_aa->aa_exts);
1482 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1483 new_aa->aa_resends = aa->aa_resends;
1485 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1486 if (oap->oap_request) {
1487 ptlrpc_req_finished(oap->oap_request);
1488 oap->oap_request = ptlrpc_request_addref(new_req);
1492 new_aa->aa_ocapa = aa->aa_ocapa;
1493 aa->aa_ocapa = NULL;
1495 /* XXX: This code will run into problem if we're going to support
1496 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1497 * and wait for all of them to be finished. We should inherit request
1498 * set from old request. */
1499 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1501 DEBUG_REQ(D_INFO, new_req, "new request");
1506 * ugh, we want disk allocation on the target to happen in offset order. we'll
1507 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1508 * fine for our small page arrays and doesn't require allocation. its an
1509 * insertion sort that swaps elements that are strides apart, shrinking the
1510 * stride down until its '1' and the array is sorted.
1512 static void sort_brw_pages(struct brw_page **array, int num)
1515 struct brw_page *tmp;
1519 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1524 for (i = stride ; i < num ; i++) {
1527 while (j >= stride && array[j - stride]->off > tmp->off) {
1528 array[j] = array[j - stride];
1533 } while (stride > 1);
1536 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1538 LASSERT(ppga != NULL);
1539 OBD_FREE(ppga, sizeof(*ppga) * count);
1542 static int brw_interpret(const struct lu_env *env,
1543 struct ptlrpc_request *req, void *data, int rc)
1545 struct osc_brw_async_args *aa = data;
1546 struct osc_extent *ext;
1547 struct osc_extent *tmp;
1548 struct client_obd *cli = aa->aa_cli;
1551 rc = osc_brw_fini_request(req, rc);
1552 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1553 /* When server return -EINPROGRESS, client should always retry
1554 * regardless of the number of times the bulk was resent already. */
1555 if (osc_recoverable_error(rc)) {
1556 if (req->rq_import_generation !=
1557 req->rq_import->imp_generation) {
1558 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1559 ""DOSTID", rc = %d.\n",
1560 req->rq_import->imp_obd->obd_name,
1561 POSTID(&aa->aa_oa->o_oi), rc);
1562 } else if (rc == -EINPROGRESS ||
1563 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1564 rc = osc_brw_redo_request(req, aa, rc);
1566 CERROR("%s: too many resent retries for object: "
1567 ""LPU64":"LPU64", rc = %d.\n",
1568 req->rq_import->imp_obd->obd_name,
1569 POSTID(&aa->aa_oa->o_oi), rc);
1574 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1579 capa_put(aa->aa_ocapa);
1580 aa->aa_ocapa = NULL;
1584 struct obdo *oa = aa->aa_oa;
1585 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1586 unsigned long valid = 0;
1587 struct cl_object *obj;
1588 struct osc_async_page *last;
1590 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1591 obj = osc2cl(last->oap_obj);
1593 cl_object_attr_lock(obj);
1594 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1595 attr->cat_blocks = oa->o_blocks;
1596 valid |= CAT_BLOCKS;
1598 if (oa->o_valid & OBD_MD_FLMTIME) {
1599 attr->cat_mtime = oa->o_mtime;
1602 if (oa->o_valid & OBD_MD_FLATIME) {
1603 attr->cat_atime = oa->o_atime;
1606 if (oa->o_valid & OBD_MD_FLCTIME) {
1607 attr->cat_ctime = oa->o_ctime;
1611 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1612 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1613 loff_t last_off = last->oap_count + last->oap_obj_off +
1616 /* Change file size if this is an out of quota or
1617 * direct IO write and it extends the file size */
1618 if (loi->loi_lvb.lvb_size < last_off) {
1619 attr->cat_size = last_off;
1622 /* Extend KMS if it's not a lockless write */
1623 if (loi->loi_kms < last_off &&
1624 oap2osc_page(last)->ops_srvlock == 0) {
1625 attr->cat_kms = last_off;
1631 cl_object_attr_update(env, obj, attr, valid);
1632 cl_object_attr_unlock(obj);
1634 OBDO_FREE(aa->aa_oa);
1636 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1637 osc_inc_unstable_pages(req);
1639 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1640 list_del_init(&ext->oe_link);
1641 osc_extent_finish(env, ext, 1, rc);
1643 LASSERT(list_empty(&aa->aa_exts));
1644 LASSERT(list_empty(&aa->aa_oaps));
1646 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1647 req->rq_bulk->bd_nob_transferred);
1648 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1649 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1651 spin_lock(&cli->cl_loi_list_lock);
1652 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1653 * is called so we know whether to go to sync BRWs or wait for more
1654 * RPCs to complete */
1655 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1656 cli->cl_w_in_flight--;
1658 cli->cl_r_in_flight--;
1659 osc_wake_cache_waiters(cli);
1660 spin_unlock(&cli->cl_loi_list_lock);
1662 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1666 static void brw_commit(struct ptlrpc_request *req)
1668 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1669 * this called via the rq_commit_cb, I need to ensure
1670 * osc_dec_unstable_pages is still called. Otherwise unstable
1671 * pages may be leaked. */
1672 spin_lock(&req->rq_lock);
1673 if (likely(req->rq_unstable)) {
1674 req->rq_unstable = 0;
1675 spin_unlock(&req->rq_lock);
1677 osc_dec_unstable_pages(req);
1679 req->rq_committed = 1;
1680 spin_unlock(&req->rq_lock);
1685 * Build an RPC by the list of extent @ext_list. The caller must ensure
1686 * that the total pages in this list are NOT over max pages per RPC.
1687 * Extents in the list must be in OES_RPC state.
1689 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1690 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1692 struct ptlrpc_request *req = NULL;
1693 struct osc_extent *ext;
1694 struct brw_page **pga = NULL;
1695 struct osc_brw_async_args *aa = NULL;
1696 struct obdo *oa = NULL;
1697 struct osc_async_page *oap;
1698 struct osc_async_page *tmp;
1699 struct cl_req *clerq = NULL;
1700 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1702 struct cl_req_attr *crattr = NULL;
1703 loff_t starting_offset = OBD_OBJECT_EOF;
1704 loff_t ending_offset = 0;
1708 bool soft_sync = false;
1711 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1712 struct ost_body *body;
1714 LASSERT(!list_empty(ext_list));
1716 /* add pages into rpc_list to build BRW rpc */
1717 list_for_each_entry(ext, ext_list, oe_link) {
1718 LASSERT(ext->oe_state == OES_RPC);
1719 mem_tight |= ext->oe_memalloc;
1720 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1722 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1723 if (starting_offset == OBD_OBJECT_EOF ||
1724 starting_offset > oap->oap_obj_off)
1725 starting_offset = oap->oap_obj_off;
1727 LASSERT(oap->oap_page_off == 0);
1728 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1729 ending_offset = oap->oap_obj_off +
1732 LASSERT(oap->oap_page_off + oap->oap_count ==
1737 soft_sync = osc_over_unstable_soft_limit(cli);
1739 mpflag = cfs_memory_pressure_get_and_set();
1741 OBD_ALLOC(crattr, sizeof(*crattr));
1743 GOTO(out, rc = -ENOMEM);
1745 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1747 GOTO(out, rc = -ENOMEM);
1751 GOTO(out, rc = -ENOMEM);
1754 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1755 struct cl_page *page = oap2cl_page(oap);
1756 if (clerq == NULL) {
1757 clerq = cl_req_alloc(env, page, crt,
1758 1 /* only 1-object rpcs for now */);
1760 GOTO(out, rc = PTR_ERR(clerq));
1763 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1765 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1766 pga[i] = &oap->oap_brw_page;
1767 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1768 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1769 pga[i]->pg, page_index(oap->oap_page), oap,
1772 cl_req_page_add(env, clerq, page);
1775 /* always get the data for the obdo for the rpc */
1776 LASSERT(clerq != NULL);
1777 crattr->cra_oa = oa;
1778 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1780 rc = cl_req_prep(env, clerq);
1782 CERROR("cl_req_prep failed: %d\n", rc);
1786 sort_brw_pages(pga, page_count);
1787 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req,
1788 crattr->cra_capa, 1, 0);
1790 CERROR("prep_req failed: %d\n", rc);
1794 req->rq_commit_cb = brw_commit;
1795 req->rq_interpret_reply = brw_interpret;
1798 req->rq_memalloc = 1;
1800 /* Need to update the timestamps after the request is built in case
1801 * we race with setattr (locally or in queue at OST). If OST gets
1802 * later setattr before earlier BRW (as determined by the request xid),
1803 * the OST will not use BRW timestamps. Sadly, there is no obvious
1804 * way to do this in a single call. bug 10150 */
1805 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1806 crattr->cra_oa = &body->oa;
1807 cl_req_attr_set(env, clerq, crattr,
1808 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1810 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1812 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1813 aa = ptlrpc_req_async_args(req);
1814 INIT_LIST_HEAD(&aa->aa_oaps);
1815 list_splice_init(&rpc_list, &aa->aa_oaps);
1816 INIT_LIST_HEAD(&aa->aa_exts);
1817 list_splice_init(ext_list, &aa->aa_exts);
1818 aa->aa_clerq = clerq;
1820 /* queued sync pages can be torn down while the pages
1821 * were between the pending list and the rpc */
1823 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1824 /* only one oap gets a request reference */
1827 if (oap->oap_interrupted && !req->rq_intr) {
1828 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1830 ptlrpc_mark_interrupted(req);
1834 tmp->oap_request = ptlrpc_request_addref(req);
1836 spin_lock(&cli->cl_loi_list_lock);
1837 starting_offset >>= PAGE_CACHE_SHIFT;
1838 if (cmd == OBD_BRW_READ) {
1839 cli->cl_r_in_flight++;
1840 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1841 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1842 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1843 starting_offset + 1);
1845 cli->cl_w_in_flight++;
1846 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1847 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1848 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1849 starting_offset + 1);
1851 spin_unlock(&cli->cl_loi_list_lock);
1853 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1854 page_count, aa, cli->cl_r_in_flight,
1855 cli->cl_w_in_flight);
1857 /* XXX: Maybe the caller can check the RPC bulk descriptor to
1858 * see which CPU/NUMA node the majority of pages were allocated
1859 * on, and try to assign the async RPC to the CPU core
1860 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1862 * But on the other hand, we expect that multiple ptlrpcd
1863 * threads and the initial write sponsor can run in parallel,
1864 * especially when data checksum is enabled, which is CPU-bound
1865 * operation and single ptlrpcd thread cannot process in time.
1866 * So more ptlrpcd threads sharing BRW load
1867 * (with PDL_POLICY_ROUND) seems better.
1869 ptlrpcd_add_req(req, pol, -1);
1875 cfs_memory_pressure_restore(mpflag);
1877 if (crattr != NULL) {
1878 capa_put(crattr->cra_capa);
1879 OBD_FREE(crattr, sizeof(*crattr));
1883 LASSERT(req == NULL);
1888 OBD_FREE(pga, sizeof(*pga) * page_count);
1889 /* this should happen rarely and is pretty bad, it makes the
1890 * pending list not follow the dirty order */
1891 while (!list_empty(ext_list)) {
1892 ext = list_entry(ext_list->next, struct osc_extent,
1894 list_del_init(&ext->oe_link);
1895 osc_extent_finish(env, ext, 0, rc);
1897 if (clerq && !IS_ERR(clerq))
1898 cl_req_completion(env, clerq, rc);
1903 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1904 struct ldlm_enqueue_info *einfo)
1906 void *data = einfo->ei_cbdata;
1909 LASSERT(lock != NULL);
1910 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1911 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1912 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1913 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1915 lock_res_and_lock(lock);
1917 if (lock->l_ast_data == NULL)
1918 lock->l_ast_data = data;
1919 if (lock->l_ast_data == data)
1922 unlock_res_and_lock(lock);
1927 static int osc_set_data_with_check(struct lustre_handle *lockh,
1928 struct ldlm_enqueue_info *einfo)
1930 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1934 set = osc_set_lock_data_with_check(lock, einfo);
1935 LDLM_LOCK_PUT(lock);
1937 CERROR("lockh %p, data %p - client evicted?\n",
1938 lockh, einfo->ei_cbdata);
1942 static int osc_enqueue_fini(struct ptlrpc_request *req,
1943 osc_enqueue_upcall_f upcall, void *cookie,
1944 struct lustre_handle *lockh, ldlm_mode_t mode,
1945 __u64 *flags, int agl, int errcode)
1947 bool intent = *flags & LDLM_FL_HAS_INTENT;
1951 /* The request was created before ldlm_cli_enqueue call. */
1952 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1953 struct ldlm_reply *rep;
1955 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1956 LASSERT(rep != NULL);
1958 rep->lock_policy_res1 =
1959 ptlrpc_status_ntoh(rep->lock_policy_res1);
1960 if (rep->lock_policy_res1)
1961 errcode = rep->lock_policy_res1;
1963 *flags |= LDLM_FL_LVB_READY;
1964 } else if (errcode == ELDLM_OK) {
1965 *flags |= LDLM_FL_LVB_READY;
1968 /* Call the update callback. */
1969 rc = (*upcall)(cookie, lockh, errcode);
1971 /* release the reference taken in ldlm_cli_enqueue() */
1972 if (errcode == ELDLM_LOCK_MATCHED)
1974 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1975 ldlm_lock_decref(lockh, mode);
1980 static int osc_enqueue_interpret(const struct lu_env *env,
1981 struct ptlrpc_request *req,
1982 struct osc_enqueue_args *aa, int rc)
1984 struct ldlm_lock *lock;
1985 struct lustre_handle *lockh = &aa->oa_lockh;
1986 ldlm_mode_t mode = aa->oa_mode;
1987 struct ost_lvb *lvb = aa->oa_lvb;
1988 __u32 lvb_len = sizeof(*lvb);
1993 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1995 lock = ldlm_handle2lock(lockh);
1996 LASSERTF(lock != NULL,
1997 "lockh "LPX64", req %p, aa %p - client evicted?\n",
1998 lockh->cookie, req, aa);
2000 /* Take an additional reference so that a blocking AST that
2001 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2002 * to arrive after an upcall has been executed by
2003 * osc_enqueue_fini(). */
2004 ldlm_lock_addref(lockh, mode);
2006 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2007 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2009 /* Let CP AST to grant the lock first. */
2010 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2013 LASSERT(aa->oa_lvb == NULL);
2014 LASSERT(aa->oa_flags == NULL);
2015 aa->oa_flags = &flags;
2018 /* Complete obtaining the lock procedure. */
2019 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2020 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2022 /* Complete osc stuff. */
2023 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2024 aa->oa_flags, aa->oa_agl, rc);
2026 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2028 ldlm_lock_decref(lockh, mode);
2029 LDLM_LOCK_PUT(lock);
2033 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2035 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2036 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2037 * other synchronous requests, however keeping some locks and trying to obtain
2038 * others may take a considerable amount of time in a case of ost failure; and
2039 * when other sync requests do not get released lock from a client, the client
2040 * is evicted from the cluster -- such scenarious make the life difficult, so
2041 * release locks just after they are obtained. */
2042 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2043 __u64 *flags, ldlm_policy_data_t *policy,
2044 struct ost_lvb *lvb, int kms_valid,
2045 osc_enqueue_upcall_f upcall, void *cookie,
2046 struct ldlm_enqueue_info *einfo,
2047 struct ptlrpc_request_set *rqset, int async, int agl)
2049 struct obd_device *obd = exp->exp_obd;
2050 struct lustre_handle lockh = { 0 };
2051 struct ptlrpc_request *req = NULL;
2052 int intent = *flags & LDLM_FL_HAS_INTENT;
2053 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2058 /* Filesystem lock extents are extended to page boundaries so that
2059 * dealing with the page cache is a little smoother. */
2060 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2061 policy->l_extent.end |= ~PAGE_MASK;
2064 * kms is not valid when either object is completely fresh (so that no
2065 * locks are cached), or object was evicted. In the latter case cached
2066 * lock cannot be used, because it would prime inode state with
2067 * potentially stale LVB.
2072 /* Next, search for already existing extent locks that will cover us */
2073 /* If we're trying to read, we also search for an existing PW lock. The
2074 * VFS and page cache already protect us locally, so lots of readers/
2075 * writers can share a single PW lock.
2077 * There are problems with conversion deadlocks, so instead of
2078 * converting a read lock to a write lock, we'll just enqueue a new
2081 * At some point we should cancel the read lock instead of making them
2082 * send us a blocking callback, but there are problems with canceling
2083 * locks out from other users right now, too. */
2084 mode = einfo->ei_mode;
2085 if (einfo->ei_mode == LCK_PR)
2087 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2088 einfo->ei_type, policy, mode, &lockh, 0);
2090 struct ldlm_lock *matched;
2092 if (*flags & LDLM_FL_TEST_LOCK)
2095 matched = ldlm_handle2lock(&lockh);
2097 /* AGL enqueues DLM locks speculatively. Therefore if
2098 * it already exists a DLM lock, it wll just inform the
2099 * caller to cancel the AGL process for this stripe. */
2100 ldlm_lock_decref(&lockh, mode);
2101 LDLM_LOCK_PUT(matched);
2103 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2104 *flags |= LDLM_FL_LVB_READY;
2106 /* We already have a lock, and it's referenced. */
2107 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2109 ldlm_lock_decref(&lockh, mode);
2110 LDLM_LOCK_PUT(matched);
2113 ldlm_lock_decref(&lockh, mode);
2114 LDLM_LOCK_PUT(matched);
2119 if (*flags & LDLM_FL_TEST_LOCK)
2123 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2124 &RQF_LDLM_ENQUEUE_LVB);
2128 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2130 ptlrpc_request_free(req);
2134 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2136 ptlrpc_request_set_replen(req);
2139 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2140 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2142 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2143 sizeof(*lvb), LVB_T_OST, &lockh, async);
2146 struct osc_enqueue_args *aa;
2147 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2148 aa = ptlrpc_req_async_args(req);
2150 aa->oa_mode = einfo->ei_mode;
2151 aa->oa_type = einfo->ei_type;
2152 lustre_handle_copy(&aa->oa_lockh, &lockh);
2153 aa->oa_upcall = upcall;
2154 aa->oa_cookie = cookie;
2157 aa->oa_flags = flags;
2160 /* AGL is essentially to enqueue an DLM lock
2161 * in advance, so we don't care about the
2162 * result of AGL enqueue. */
2164 aa->oa_flags = NULL;
2167 req->rq_interpret_reply =
2168 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2169 if (rqset == PTLRPCD_SET)
2170 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2172 ptlrpc_set_add_req(rqset, req);
2173 } else if (intent) {
2174 ptlrpc_req_finished(req);
2179 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2182 ptlrpc_req_finished(req);
2187 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2188 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2189 __u64 *flags, void *data, struct lustre_handle *lockh,
2192 struct obd_device *obd = exp->exp_obd;
2193 __u64 lflags = *flags;
2197 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2200 /* Filesystem lock extents are extended to page boundaries so that
2201 * dealing with the page cache is a little smoother */
2202 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2203 policy->l_extent.end |= ~PAGE_MASK;
2205 /* Next, search for already existing extent locks that will cover us */
2206 /* If we're trying to read, we also search for an existing PW lock. The
2207 * VFS and page cache already protect us locally, so lots of readers/
2208 * writers can share a single PW lock. */
2212 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2213 res_id, type, policy, rc, lockh, unref);
2216 if (!osc_set_data_with_check(lockh, data)) {
2217 if (!(lflags & LDLM_FL_TEST_LOCK))
2218 ldlm_lock_decref(lockh, rc);
2222 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2223 ldlm_lock_addref(lockh, LCK_PR);
2224 ldlm_lock_decref(lockh, LCK_PW);
2231 static int osc_statfs_interpret(const struct lu_env *env,
2232 struct ptlrpc_request *req,
2233 struct osc_async_args *aa, int rc)
2235 struct obd_statfs *msfs;
2239 /* The request has in fact never been sent
2240 * due to issues at a higher level (LOV).
2241 * Exit immediately since the caller is
2242 * aware of the problem and takes care
2243 * of the clean up */
2246 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2247 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2253 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2255 GOTO(out, rc = -EPROTO);
2258 *aa->aa_oi->oi_osfs = *msfs;
2260 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2264 static int osc_statfs_async(struct obd_export *exp,
2265 struct obd_info *oinfo, __u64 max_age,
2266 struct ptlrpc_request_set *rqset)
2268 struct obd_device *obd = class_exp2obd(exp);
2269 struct ptlrpc_request *req;
2270 struct osc_async_args *aa;
2274 /* We could possibly pass max_age in the request (as an absolute
2275 * timestamp or a "seconds.usec ago") so the target can avoid doing
2276 * extra calls into the filesystem if that isn't necessary (e.g.
2277 * during mount that would help a bit). Having relative timestamps
2278 * is not so great if request processing is slow, while absolute
2279 * timestamps are not ideal because they need time synchronization. */
2280 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2284 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2286 ptlrpc_request_free(req);
2289 ptlrpc_request_set_replen(req);
2290 req->rq_request_portal = OST_CREATE_PORTAL;
2291 ptlrpc_at_set_req_timeout(req);
2293 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2294 /* procfs requests not want stat in wait for avoid deadlock */
2295 req->rq_no_resend = 1;
2296 req->rq_no_delay = 1;
2299 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2300 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2301 aa = ptlrpc_req_async_args(req);
2304 ptlrpc_set_add_req(rqset, req);
2308 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2309 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2311 struct obd_device *obd = class_exp2obd(exp);
2312 struct obd_statfs *msfs;
2313 struct ptlrpc_request *req;
2314 struct obd_import *imp = NULL;
2318 /*Since the request might also come from lprocfs, so we need
2319 *sync this with client_disconnect_export Bug15684*/
2320 down_read(&obd->u.cli.cl_sem);
2321 if (obd->u.cli.cl_import)
2322 imp = class_import_get(obd->u.cli.cl_import);
2323 up_read(&obd->u.cli.cl_sem);
2327 /* We could possibly pass max_age in the request (as an absolute
2328 * timestamp or a "seconds.usec ago") so the target can avoid doing
2329 * extra calls into the filesystem if that isn't necessary (e.g.
2330 * during mount that would help a bit). Having relative timestamps
2331 * is not so great if request processing is slow, while absolute
2332 * timestamps are not ideal because they need time synchronization. */
2333 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2335 class_import_put(imp);
2340 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2342 ptlrpc_request_free(req);
2345 ptlrpc_request_set_replen(req);
2346 req->rq_request_portal = OST_CREATE_PORTAL;
2347 ptlrpc_at_set_req_timeout(req);
2349 if (flags & OBD_STATFS_NODELAY) {
2350 /* procfs requests not want stat in wait for avoid deadlock */
2351 req->rq_no_resend = 1;
2352 req->rq_no_delay = 1;
2355 rc = ptlrpc_queue_wait(req);
2359 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2361 GOTO(out, rc = -EPROTO);
2368 ptlrpc_req_finished(req);
2372 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2373 void *karg, void *uarg)
2375 struct obd_device *obd = exp->exp_obd;
2376 struct obd_ioctl_data *data = karg;
2380 if (!try_module_get(THIS_MODULE)) {
2381 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2382 module_name(THIS_MODULE));
2386 case OBD_IOC_CLIENT_RECOVER:
2387 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2388 data->ioc_inlbuf1, 0);
2392 case IOC_OSC_SET_ACTIVE:
2393 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2396 case OBD_IOC_PING_TARGET:
2397 err = ptlrpc_obd_ping(obd);
2400 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2401 cmd, current_comm());
2402 GOTO(out, err = -ENOTTY);
2405 module_put(THIS_MODULE);
2409 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2410 u32 keylen, void *key,
2411 u32 vallen, void *val,
2412 struct ptlrpc_request_set *set)
2414 struct ptlrpc_request *req;
2415 struct obd_device *obd = exp->exp_obd;
2416 struct obd_import *imp = class_exp2cliimp(exp);
2421 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2423 if (KEY_IS(KEY_CHECKSUM)) {
2424 if (vallen != sizeof(int))
2426 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2430 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2431 sptlrpc_conf_client_adapt(obd);
2435 if (KEY_IS(KEY_FLUSH_CTX)) {
2436 sptlrpc_import_flush_my_ctx(imp);
2440 if (KEY_IS(KEY_CACHE_SET)) {
2441 struct client_obd *cli = &obd->u.cli;
2443 LASSERT(cli->cl_cache == NULL); /* only once */
2444 cli->cl_cache = (struct cl_client_cache *)val;
2445 cl_cache_incref(cli->cl_cache);
2446 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2448 /* add this osc into entity list */
2449 LASSERT(list_empty(&cli->cl_lru_osc));
2450 spin_lock(&cli->cl_cache->ccc_lru_lock);
2451 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2452 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2457 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2458 struct client_obd *cli = &obd->u.cli;
2459 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2460 long target = *(long *)val;
2462 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2467 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2470 /* We pass all other commands directly to OST. Since nobody calls osc
2471 methods directly and everybody is supposed to go through LOV, we
2472 assume lov checked invalid values for us.
2473 The only recognised values so far are evict_by_nid and mds_conn.
2474 Even if something bad goes through, we'd get a -EINVAL from OST
2477 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2478 &RQF_OST_SET_GRANT_INFO :
2483 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2484 RCL_CLIENT, keylen);
2485 if (!KEY_IS(KEY_GRANT_SHRINK))
2486 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2487 RCL_CLIENT, vallen);
2488 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2490 ptlrpc_request_free(req);
2494 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2495 memcpy(tmp, key, keylen);
2496 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2499 memcpy(tmp, val, vallen);
2501 if (KEY_IS(KEY_GRANT_SHRINK)) {
2502 struct osc_grant_args *aa;
2505 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2506 aa = ptlrpc_req_async_args(req);
2509 ptlrpc_req_finished(req);
2512 *oa = ((struct ost_body *)val)->oa;
2514 req->rq_interpret_reply = osc_shrink_grant_interpret;
2517 ptlrpc_request_set_replen(req);
2518 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2519 LASSERT(set != NULL);
2520 ptlrpc_set_add_req(set, req);
2521 ptlrpc_check_set(NULL, set);
2523 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2528 static int osc_reconnect(const struct lu_env *env,
2529 struct obd_export *exp, struct obd_device *obd,
2530 struct obd_uuid *cluuid,
2531 struct obd_connect_data *data,
2534 struct client_obd *cli = &obd->u.cli;
2536 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2539 spin_lock(&cli->cl_loi_list_lock);
2540 data->ocd_grant = (cli->cl_avail_grant +
2541 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2542 2 * cli_brw_size(obd);
2543 lost_grant = cli->cl_lost_grant;
2544 cli->cl_lost_grant = 0;
2545 spin_unlock(&cli->cl_loi_list_lock);
2547 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2548 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2549 data->ocd_version, data->ocd_grant, lost_grant);
2555 static int osc_disconnect(struct obd_export *exp)
2557 struct obd_device *obd = class_exp2obd(exp);
2560 rc = client_disconnect_export(exp);
2562 * Initially we put del_shrink_grant before disconnect_export, but it
2563 * causes the following problem if setup (connect) and cleanup
2564 * (disconnect) are tangled together.
2565 * connect p1 disconnect p2
2566 * ptlrpc_connect_import
2567 * ............... class_manual_cleanup
2570 * ptlrpc_connect_interrupt
2572 * add this client to shrink list
2574 * Bang! pinger trigger the shrink.
2575 * So the osc should be disconnected from the shrink list, after we
2576 * are sure the import has been destroyed. BUG18662
2578 if (obd->u.cli.cl_import == NULL)
2579 osc_del_shrink_grant(&obd->u.cli);
2583 static int osc_import_event(struct obd_device *obd,
2584 struct obd_import *imp,
2585 enum obd_import_event event)
2587 struct client_obd *cli;
2591 LASSERT(imp->imp_obd == obd);
2594 case IMP_EVENT_DISCON: {
2596 spin_lock(&cli->cl_loi_list_lock);
2597 cli->cl_avail_grant = 0;
2598 cli->cl_lost_grant = 0;
2599 spin_unlock(&cli->cl_loi_list_lock);
2602 case IMP_EVENT_INACTIVE: {
2603 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2606 case IMP_EVENT_INVALIDATE: {
2607 struct ldlm_namespace *ns = obd->obd_namespace;
2611 env = cl_env_get(&refcheck);
2615 /* all pages go to failing rpcs due to the invalid
2617 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2619 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2620 cl_env_put(env, &refcheck);
2625 case IMP_EVENT_ACTIVE: {
2626 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2629 case IMP_EVENT_OCD: {
2630 struct obd_connect_data *ocd = &imp->imp_connect_data;
2632 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2633 osc_init_grant(&obd->u.cli, ocd);
2636 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2637 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2639 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2642 case IMP_EVENT_DEACTIVATE: {
2643 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2646 case IMP_EVENT_ACTIVATE: {
2647 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2651 CERROR("Unknown import event %d\n", event);
2658 * Determine whether the lock can be canceled before replaying the lock
2659 * during recovery, see bug16774 for detailed information.
2661 * \retval zero the lock can't be canceled
2662 * \retval other ok to cancel
2664 static int osc_cancel_weight(struct ldlm_lock *lock)
2667 * Cancel all unused and granted extent lock.
2669 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2670 lock->l_granted_mode == lock->l_req_mode &&
2671 osc_ldlm_weigh_ast(lock) == 0)
2677 static int brw_queue_work(const struct lu_env *env, void *data)
2679 struct client_obd *cli = data;
2681 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2683 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2687 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2689 struct client_obd *cli = &obd->u.cli;
2690 struct obd_type *type;
2695 rc = ptlrpcd_addref();
2699 rc = client_obd_setup(obd, lcfg);
2701 GOTO(out_ptlrpcd, rc);
2703 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2704 if (IS_ERR(handler))
2705 GOTO(out_client_setup, rc = PTR_ERR(handler));
2706 cli->cl_writeback_work = handler;
2708 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2709 if (IS_ERR(handler))
2710 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2711 cli->cl_lru_work = handler;
2713 rc = osc_quota_setup(obd);
2715 GOTO(out_ptlrpcd_work, rc);
2717 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2719 #ifdef CONFIG_PROC_FS
2720 obd->obd_vars = lprocfs_osc_obd_vars;
2722 /* If this is true then both client (osc) and server (osp) are on the
2723 * same node. The osp layer if loaded first will register the osc proc
2724 * directory. In that case this obd_device will be attached its proc
2725 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2726 type = class_search_type(LUSTRE_OSP_NAME);
2727 if (type && type->typ_procsym) {
2728 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2730 obd->obd_vars, obd);
2731 if (IS_ERR(obd->obd_proc_entry)) {
2732 rc = PTR_ERR(obd->obd_proc_entry);
2733 CERROR("error %d setting up lprocfs for %s\n", rc,
2735 obd->obd_proc_entry = NULL;
2738 rc = lprocfs_obd_setup(obd);
2741 /* If the basic OSC proc tree construction succeeded then
2742 * lets do the rest. */
2744 lproc_osc_attach_seqstat(obd);
2745 sptlrpc_lprocfs_cliobd_attach(obd);
2746 ptlrpc_lprocfs_register_obd(obd);
2749 /* We need to allocate a few requests more, because
2750 * brw_interpret tries to create new requests before freeing
2751 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2752 * reserved, but I'm afraid that might be too much wasted RAM
2753 * in fact, so 2 is just my guess and still should work. */
2754 cli->cl_import->imp_rq_pool =
2755 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2757 ptlrpc_add_rqs_to_pool);
2759 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2760 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2764 if (cli->cl_writeback_work != NULL) {
2765 ptlrpcd_destroy_work(cli->cl_writeback_work);
2766 cli->cl_writeback_work = NULL;
2768 if (cli->cl_lru_work != NULL) {
2769 ptlrpcd_destroy_work(cli->cl_lru_work);
2770 cli->cl_lru_work = NULL;
2773 client_obd_cleanup(obd);
2779 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2785 case OBD_CLEANUP_EARLY: {
2786 struct obd_import *imp;
2787 imp = obd->u.cli.cl_import;
2788 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2789 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2790 ptlrpc_deactivate_import(imp);
2791 spin_lock(&imp->imp_lock);
2792 imp->imp_pingable = 0;
2793 spin_unlock(&imp->imp_lock);
2796 case OBD_CLEANUP_EXPORTS: {
2797 struct client_obd *cli = &obd->u.cli;
2799 * for echo client, export may be on zombie list, wait for
2800 * zombie thread to cull it, because cli.cl_import will be
2801 * cleared in client_disconnect_export():
2802 * class_export_destroy() -> obd_cleanup() ->
2803 * echo_device_free() -> echo_client_cleanup() ->
2804 * obd_disconnect() -> osc_disconnect() ->
2805 * client_disconnect_export()
2807 obd_zombie_barrier();
2808 if (cli->cl_writeback_work) {
2809 ptlrpcd_destroy_work(cli->cl_writeback_work);
2810 cli->cl_writeback_work = NULL;
2812 if (cli->cl_lru_work) {
2813 ptlrpcd_destroy_work(cli->cl_lru_work);
2814 cli->cl_lru_work = NULL;
2816 obd_cleanup_client_import(obd);
2817 ptlrpc_lprocfs_unregister_obd(obd);
2818 lprocfs_obd_cleanup(obd);
2825 int osc_cleanup(struct obd_device *obd)
2827 struct client_obd *cli = &obd->u.cli;
2833 if (cli->cl_cache != NULL) {
2834 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2835 spin_lock(&cli->cl_cache->ccc_lru_lock);
2836 list_del_init(&cli->cl_lru_osc);
2837 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2838 cli->cl_lru_left = NULL;
2839 cl_cache_decref(cli->cl_cache);
2840 cli->cl_cache = NULL;
2843 /* free memory of osc quota cache */
2844 osc_quota_cleanup(obd);
2846 rc = client_obd_cleanup(obd);
2852 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2854 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2855 return rc > 0 ? 0: rc;
2858 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2860 return osc_process_config_base(obd, buf);
2863 static struct obd_ops osc_obd_ops = {
2864 .o_owner = THIS_MODULE,
2865 .o_setup = osc_setup,
2866 .o_precleanup = osc_precleanup,
2867 .o_cleanup = osc_cleanup,
2868 .o_add_conn = client_import_add_conn,
2869 .o_del_conn = client_import_del_conn,
2870 .o_connect = client_connect_import,
2871 .o_reconnect = osc_reconnect,
2872 .o_disconnect = osc_disconnect,
2873 .o_statfs = osc_statfs,
2874 .o_statfs_async = osc_statfs_async,
2875 .o_create = osc_create,
2876 .o_destroy = osc_destroy,
2877 .o_getattr = osc_getattr,
2878 .o_setattr = osc_setattr,
2879 .o_iocontrol = osc_iocontrol,
2880 .o_set_info_async = osc_set_info_async,
2881 .o_import_event = osc_import_event,
2882 .o_process_config = osc_process_config,
2883 .o_quotactl = osc_quotactl,
2886 static int __init osc_init(void)
2888 bool enable_proc = true;
2889 struct obd_type *type;
2893 /* print an address of _any_ initialized kernel symbol from this
2894 * module, to allow debugging with gdb that doesn't support data
2895 * symbols from modules.*/
2896 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2898 rc = lu_kmem_init(osc_caches);
2902 type = class_search_type(LUSTRE_OSP_NAME);
2903 if (type != NULL && type->typ_procsym != NULL)
2904 enable_proc = false;
2906 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2907 LUSTRE_OSC_NAME, &osc_device_type);
2909 lu_kmem_fini(osc_caches);
2916 static void /*__exit*/ osc_exit(void)
2918 class_unregister_type(LUSTRE_OSC_NAME);
2919 lu_kmem_fini(osc_caches);
2922 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2923 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2924 MODULE_VERSION(LUSTRE_VERSION_STRING);
2925 MODULE_LICENSE("GPL");
2927 module_init(osc_init);
2928 module_exit(osc_exit);