4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre/lustre_user.h>
43 #include <lprocfs_status.h>
44 #include <lustre_debug.h>
45 #include <lustre_dlm.h>
46 #include <lustre_fid.h>
47 #include <lustre_ha.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_net.h>
50 #include <lustre_obdo.h>
51 #include <lustre_param.h>
53 #include <obd_cksum.h>
54 #include <obd_class.h>
56 #include "osc_cl_internal.h"
57 #include "osc_internal.h"
59 atomic_t osc_pool_req_count;
60 unsigned int osc_reqpool_maxreqcount;
61 struct ptlrpc_request_pool *osc_rq_pool;
63 /* max memory used for request pool, unit is MB */
64 static unsigned int osc_reqpool_mem_max = 5;
65 module_param(osc_reqpool_mem_max, uint, 0444);
67 struct osc_brw_async_args {
73 struct brw_page **aa_ppga;
74 struct client_obd *aa_cli;
75 struct list_head aa_oaps;
76 struct list_head aa_exts;
79 #define osc_grant_args osc_brw_async_args
81 struct osc_setattr_args {
83 obd_enqueue_update_f sa_upcall;
87 struct osc_fsync_args {
88 struct osc_object *fa_obj;
90 obd_enqueue_update_f fa_upcall;
94 struct osc_ladvise_args {
96 obd_enqueue_update_f la_upcall;
100 struct osc_enqueue_args {
101 struct obd_export *oa_exp;
102 enum ldlm_type oa_type;
103 enum ldlm_mode oa_mode;
105 osc_enqueue_upcall_f oa_upcall;
107 struct ost_lvb *oa_lvb;
108 struct lustre_handle oa_lockh;
109 unsigned int oa_agl:1;
112 static void osc_release_ppga(struct brw_page **ppga, size_t count);
113 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
116 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
118 struct ost_body *body;
120 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
123 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
126 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
129 struct ptlrpc_request *req;
130 struct ost_body *body;
134 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
138 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
140 ptlrpc_request_free(req);
144 osc_pack_req_body(req, oa);
146 ptlrpc_request_set_replen(req);
148 rc = ptlrpc_queue_wait(req);
152 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
154 GOTO(out, rc = -EPROTO);
156 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
157 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
159 oa->o_blksize = cli_brw_size(exp->exp_obd);
160 oa->o_valid |= OBD_MD_FLBLKSZ;
164 ptlrpc_req_finished(req);
169 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
172 struct ptlrpc_request *req;
173 struct ost_body *body;
177 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
179 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
183 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
185 ptlrpc_request_free(req);
189 osc_pack_req_body(req, oa);
191 ptlrpc_request_set_replen(req);
193 rc = ptlrpc_queue_wait(req);
197 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
199 GOTO(out, rc = -EPROTO);
201 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
205 ptlrpc_req_finished(req);
210 static int osc_setattr_interpret(const struct lu_env *env,
211 struct ptlrpc_request *req,
212 struct osc_setattr_args *sa, int rc)
214 struct ost_body *body;
220 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
222 GOTO(out, rc = -EPROTO);
224 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
227 rc = sa->sa_upcall(sa->sa_cookie, rc);
231 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
232 obd_enqueue_update_f upcall, void *cookie,
233 struct ptlrpc_request_set *rqset)
235 struct ptlrpc_request *req;
236 struct osc_setattr_args *sa;
241 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
245 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
247 ptlrpc_request_free(req);
251 osc_pack_req_body(req, oa);
253 ptlrpc_request_set_replen(req);
255 /* do mds to ost setattr asynchronously */
257 /* Do not wait for response. */
258 ptlrpcd_add_req(req);
260 req->rq_interpret_reply =
261 (ptlrpc_interpterer_t)osc_setattr_interpret;
263 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
264 sa = ptlrpc_req_async_args(req);
266 sa->sa_upcall = upcall;
267 sa->sa_cookie = cookie;
269 if (rqset == PTLRPCD_SET)
270 ptlrpcd_add_req(req);
272 ptlrpc_set_add_req(rqset, req);
278 static int osc_ladvise_interpret(const struct lu_env *env,
279 struct ptlrpc_request *req,
282 struct osc_ladvise_args *la = arg;
283 struct ost_body *body;
289 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291 GOTO(out, rc = -EPROTO);
293 *la->la_oa = body->oa;
295 rc = la->la_upcall(la->la_cookie, rc);
300 * If rqset is NULL, do not wait for response. Upcall and cookie could also
301 * be NULL in this case
303 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
304 struct ladvise_hdr *ladvise_hdr,
305 obd_enqueue_update_f upcall, void *cookie,
306 struct ptlrpc_request_set *rqset)
308 struct ptlrpc_request *req;
309 struct ost_body *body;
310 struct osc_ladvise_args *la;
312 struct lu_ladvise *req_ladvise;
313 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
314 int num_advise = ladvise_hdr->lah_count;
315 struct ladvise_hdr *req_ladvise_hdr;
318 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
322 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
323 num_advise * sizeof(*ladvise));
324 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
326 ptlrpc_request_free(req);
329 req->rq_request_portal = OST_IO_PORTAL;
330 ptlrpc_at_set_req_timeout(req);
332 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
334 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
337 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
338 &RMF_OST_LADVISE_HDR);
339 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
341 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
342 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
343 ptlrpc_request_set_replen(req);
346 /* Do not wait for response. */
347 ptlrpcd_add_req(req);
351 req->rq_interpret_reply = osc_ladvise_interpret;
352 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
353 la = ptlrpc_req_async_args(req);
355 la->la_upcall = upcall;
356 la->la_cookie = cookie;
358 if (rqset == PTLRPCD_SET)
359 ptlrpcd_add_req(req);
361 ptlrpc_set_add_req(rqset, req);
366 static int osc_create(const struct lu_env *env, struct obd_export *exp,
369 struct ptlrpc_request *req;
370 struct ost_body *body;
375 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
376 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
380 GOTO(out, rc = -ENOMEM);
382 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
384 ptlrpc_request_free(req);
388 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
391 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
393 ptlrpc_request_set_replen(req);
395 rc = ptlrpc_queue_wait(req);
399 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
401 GOTO(out_req, rc = -EPROTO);
403 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
404 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
406 oa->o_blksize = cli_brw_size(exp->exp_obd);
407 oa->o_valid |= OBD_MD_FLBLKSZ;
409 CDEBUG(D_HA, "transno: "LPD64"\n",
410 lustre_msg_get_transno(req->rq_repmsg));
412 ptlrpc_req_finished(req);
417 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
418 obd_enqueue_update_f upcall, void *cookie,
419 struct ptlrpc_request_set *rqset)
421 struct ptlrpc_request *req;
422 struct osc_setattr_args *sa;
423 struct ost_body *body;
427 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
431 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
433 ptlrpc_request_free(req);
436 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
437 ptlrpc_at_set_req_timeout(req);
439 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
441 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
443 ptlrpc_request_set_replen(req);
445 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
446 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
447 sa = ptlrpc_req_async_args(req);
449 sa->sa_upcall = upcall;
450 sa->sa_cookie = cookie;
451 if (rqset == PTLRPCD_SET)
452 ptlrpcd_add_req(req);
454 ptlrpc_set_add_req(rqset, req);
459 static int osc_sync_interpret(const struct lu_env *env,
460 struct ptlrpc_request *req,
463 struct osc_fsync_args *fa = arg;
464 struct ost_body *body;
465 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
466 unsigned long valid = 0;
467 struct cl_object *obj;
473 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475 CERROR("can't unpack ost_body\n");
476 GOTO(out, rc = -EPROTO);
479 *fa->fa_oa = body->oa;
480 obj = osc2cl(fa->fa_obj);
482 /* Update osc object's blocks attribute */
483 cl_object_attr_lock(obj);
484 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
485 attr->cat_blocks = body->oa.o_blocks;
490 cl_object_attr_update(env, obj, attr, valid);
491 cl_object_attr_unlock(obj);
494 rc = fa->fa_upcall(fa->fa_cookie, rc);
498 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
499 obd_enqueue_update_f upcall, void *cookie,
500 struct ptlrpc_request_set *rqset)
502 struct obd_export *exp = osc_export(obj);
503 struct ptlrpc_request *req;
504 struct ost_body *body;
505 struct osc_fsync_args *fa;
509 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
513 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
515 ptlrpc_request_free(req);
519 /* overload the size and blocks fields in the oa with start/end */
520 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
522 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
524 ptlrpc_request_set_replen(req);
525 req->rq_interpret_reply = osc_sync_interpret;
527 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
528 fa = ptlrpc_req_async_args(req);
531 fa->fa_upcall = upcall;
532 fa->fa_cookie = cookie;
534 if (rqset == PTLRPCD_SET)
535 ptlrpcd_add_req(req);
537 ptlrpc_set_add_req(rqset, req);
542 /* Find and cancel locally locks matched by @mode in the resource found by
543 * @objid. Found locks are added into @cancel list. Returns the amount of
544 * locks added to @cancels list. */
545 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
546 struct list_head *cancels,
547 enum ldlm_mode mode, __u64 lock_flags)
549 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
550 struct ldlm_res_id res_id;
551 struct ldlm_resource *res;
555 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
556 * export) but disabled through procfs (flag in NS).
558 * This distinguishes from a case when ELC is not supported originally,
559 * when we still want to cancel locks in advance and just cancel them
560 * locally, without sending any RPC. */
561 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
564 ostid_build_res_name(&oa->o_oi, &res_id);
565 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
569 LDLM_RESOURCE_ADDREF(res);
570 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
571 lock_flags, 0, NULL);
572 LDLM_RESOURCE_DELREF(res);
573 ldlm_resource_putref(res);
577 static int osc_destroy_interpret(const struct lu_env *env,
578 struct ptlrpc_request *req, void *data,
581 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
583 atomic_dec(&cli->cl_destroy_in_flight);
584 wake_up(&cli->cl_destroy_waitq);
588 static int osc_can_send_destroy(struct client_obd *cli)
590 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
591 cli->cl_max_rpcs_in_flight) {
592 /* The destroy request can be sent */
595 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
596 cli->cl_max_rpcs_in_flight) {
598 * The counter has been modified between the two atomic
601 wake_up(&cli->cl_destroy_waitq);
606 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
609 struct client_obd *cli = &exp->exp_obd->u.cli;
610 struct ptlrpc_request *req;
611 struct ost_body *body;
612 struct list_head cancels = LIST_HEAD_INIT(cancels);
617 CDEBUG(D_INFO, "oa NULL\n");
621 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
622 LDLM_FL_DISCARD_DATA);
624 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
626 ldlm_lock_list_put(&cancels, l_bl_ast, count);
630 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
633 ptlrpc_request_free(req);
637 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
638 ptlrpc_at_set_req_timeout(req);
640 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
642 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
644 ptlrpc_request_set_replen(req);
646 req->rq_interpret_reply = osc_destroy_interpret;
647 if (!osc_can_send_destroy(cli)) {
648 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
651 * Wait until the number of on-going destroy RPCs drops
652 * under max_rpc_in_flight
654 l_wait_event_exclusive(cli->cl_destroy_waitq,
655 osc_can_send_destroy(cli), &lwi);
658 /* Do not wait for response */
659 ptlrpcd_add_req(req);
663 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
666 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
668 LASSERT(!(oa->o_valid & bits));
671 spin_lock(&cli->cl_loi_list_lock);
672 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
673 oa->o_dirty = cli->cl_dirty_grant;
675 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
676 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
677 cli->cl_dirty_max_pages)) {
678 CERROR("dirty %lu - %lu > dirty_max %lu\n",
679 cli->cl_dirty_pages, cli->cl_dirty_transit,
680 cli->cl_dirty_max_pages);
682 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
683 atomic_long_read(&obd_dirty_transit_pages) >
684 (long)(obd_max_dirty_pages + 1))) {
685 /* The atomic_read() allowing the atomic_inc() are
686 * not covered by a lock thus they may safely race and trip
687 * this CERROR() unless we add in a small fudge factor (+1). */
688 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
689 cli_name(cli), atomic_long_read(&obd_dirty_pages),
690 atomic_long_read(&obd_dirty_transit_pages),
691 obd_max_dirty_pages);
693 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
695 CERROR("dirty %lu - dirty_max %lu too big???\n",
696 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
699 unsigned long nrpages;
701 nrpages = cli->cl_max_pages_per_rpc;
702 nrpages *= cli->cl_max_rpcs_in_flight + 1;
703 nrpages = max(nrpages, cli->cl_dirty_max_pages);
704 oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
705 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
709 /* take extent tax into account when asking for more
711 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
712 cli->cl_max_extent_pages;
713 oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
716 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
717 oa->o_dropped = cli->cl_lost_grant;
718 cli->cl_lost_grant = 0;
719 spin_unlock(&cli->cl_loi_list_lock);
720 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
721 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
724 void osc_update_next_shrink(struct client_obd *cli)
726 cli->cl_next_shrink_grant =
727 cfs_time_shift(cli->cl_grant_shrink_interval);
728 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
729 cli->cl_next_shrink_grant);
732 static void __osc_update_grant(struct client_obd *cli, u64 grant)
734 spin_lock(&cli->cl_loi_list_lock);
735 cli->cl_avail_grant += grant;
736 spin_unlock(&cli->cl_loi_list_lock);
739 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
741 if (body->oa.o_valid & OBD_MD_FLGRANT) {
742 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
743 __osc_update_grant(cli, body->oa.o_grant);
747 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
748 u32 keylen, void *key,
749 u32 vallen, void *val,
750 struct ptlrpc_request_set *set);
752 static int osc_shrink_grant_interpret(const struct lu_env *env,
753 struct ptlrpc_request *req,
756 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
757 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
758 struct ost_body *body;
761 __osc_update_grant(cli, oa->o_grant);
765 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
767 osc_update_grant(cli, body);
773 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
775 spin_lock(&cli->cl_loi_list_lock);
776 oa->o_grant = cli->cl_avail_grant / 4;
777 cli->cl_avail_grant -= oa->o_grant;
778 spin_unlock(&cli->cl_loi_list_lock);
779 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
780 oa->o_valid |= OBD_MD_FLFLAGS;
783 oa->o_flags |= OBD_FL_SHRINK_GRANT;
784 osc_update_next_shrink(cli);
787 /* Shrink the current grant, either from some large amount to enough for a
788 * full set of in-flight RPCs, or if we have already shrunk to that limit
789 * then to enough for a single RPC. This avoids keeping more grant than
790 * needed, and avoids shrinking the grant piecemeal. */
791 static int osc_shrink_grant(struct client_obd *cli)
793 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
794 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
796 spin_lock(&cli->cl_loi_list_lock);
797 if (cli->cl_avail_grant <= target_bytes)
798 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
799 spin_unlock(&cli->cl_loi_list_lock);
801 return osc_shrink_grant_to_target(cli, target_bytes);
804 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
807 struct ost_body *body;
810 spin_lock(&cli->cl_loi_list_lock);
811 /* Don't shrink if we are already above or below the desired limit
812 * We don't want to shrink below a single RPC, as that will negatively
813 * impact block allocation and long-term performance. */
814 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
815 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
817 if (target_bytes >= cli->cl_avail_grant) {
818 spin_unlock(&cli->cl_loi_list_lock);
821 spin_unlock(&cli->cl_loi_list_lock);
827 osc_announce_cached(cli, &body->oa, 0);
829 spin_lock(&cli->cl_loi_list_lock);
830 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
831 cli->cl_avail_grant = target_bytes;
832 spin_unlock(&cli->cl_loi_list_lock);
833 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
834 body->oa.o_valid |= OBD_MD_FLFLAGS;
835 body->oa.o_flags = 0;
837 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
838 osc_update_next_shrink(cli);
840 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
841 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
842 sizeof(*body), body, NULL);
844 __osc_update_grant(cli, body->oa.o_grant);
849 static int osc_should_shrink_grant(struct client_obd *client)
851 cfs_time_t time = cfs_time_current();
852 cfs_time_t next_shrink = client->cl_next_shrink_grant;
854 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
855 OBD_CONNECT_GRANT_SHRINK) == 0)
858 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
859 /* Get the current RPC size directly, instead of going via:
860 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
861 * Keep comment here so that it can be found by searching. */
862 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
864 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
865 client->cl_avail_grant > brw_size)
868 osc_update_next_shrink(client);
873 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
875 struct client_obd *client;
877 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
878 if (osc_should_shrink_grant(client))
879 osc_shrink_grant(client);
884 static int osc_add_shrink_grant(struct client_obd *client)
888 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
890 osc_grant_shrink_grant_cb, NULL,
891 &client->cl_grant_shrink_list);
893 CERROR("add grant client %s error %d\n", cli_name(client), rc);
896 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
897 osc_update_next_shrink(client);
901 static int osc_del_shrink_grant(struct client_obd *client)
903 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
907 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
910 * ocd_grant is the total grant amount we're expect to hold: if we've
911 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
912 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
915 * race is tolerable here: if we're evicted, but imp_state already
916 * left EVICTED state, then cl_dirty_pages must be 0 already.
918 spin_lock(&cli->cl_loi_list_lock);
919 cli->cl_avail_grant = ocd->ocd_grant;
920 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
921 cli->cl_avail_grant -= cli->cl_reserved_grant;
922 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
923 cli->cl_avail_grant -= cli->cl_dirty_grant;
925 cli->cl_avail_grant -=
926 cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
929 if (cli->cl_avail_grant < 0) {
930 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
931 cli_name(cli), cli->cl_avail_grant,
932 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
933 /* workaround for servers which do not have the patch from
935 cli->cl_avail_grant = ocd->ocd_grant;
938 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
941 /* overhead for each extent insertion */
942 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
943 /* determine the appropriate chunk size used by osc_extent. */
944 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT,
945 ocd->ocd_grant_blkbits);
946 /* determine maximum extent size, in #pages */
947 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
948 cli->cl_max_extent_pages = size >> PAGE_CACHE_SHIFT;
949 if (cli->cl_max_extent_pages == 0)
950 cli->cl_max_extent_pages = 1;
952 cli->cl_grant_extent_tax = 0;
953 cli->cl_chunkbits = PAGE_CACHE_SHIFT;
954 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
956 spin_unlock(&cli->cl_loi_list_lock);
958 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
959 "chunk bits: %d cl_max_extent_pages: %d\n",
961 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
962 cli->cl_max_extent_pages);
964 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
965 list_empty(&cli->cl_grant_shrink_list))
966 osc_add_shrink_grant(cli);
969 /* We assume that the reason this OSC got a short read is because it read
970 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
971 * via the LOV, and it _knows_ it's reading inside the file, it's just that
972 * this stripe never got written at or beyond this stripe offset yet. */
973 static void handle_short_read(int nob_read, size_t page_count,
974 struct brw_page **pga)
979 /* skip bytes read OK */
980 while (nob_read > 0) {
981 LASSERT (page_count > 0);
983 if (pga[i]->count > nob_read) {
984 /* EOF inside this page */
985 ptr = kmap(pga[i]->pg) +
986 (pga[i]->off & ~PAGE_MASK);
987 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
994 nob_read -= pga[i]->count;
999 /* zero remaining pages */
1000 while (page_count-- > 0) {
1001 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1002 memset(ptr, 0, pga[i]->count);
1008 static int check_write_rcs(struct ptlrpc_request *req,
1009 int requested_nob, int niocount,
1010 size_t page_count, struct brw_page **pga)
1015 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1016 sizeof(*remote_rcs) *
1018 if (remote_rcs == NULL) {
1019 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1023 /* return error if any niobuf was in error */
1024 for (i = 0; i < niocount; i++) {
1025 if ((int)remote_rcs[i] < 0)
1026 return(remote_rcs[i]);
1028 if (remote_rcs[i] != 0) {
1029 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1030 i, remote_rcs[i], req);
1035 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1036 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1037 req->rq_bulk->bd_nob_transferred, requested_nob);
1044 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1046 if (p1->flag != p2->flag) {
1047 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1048 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1049 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1051 /* warn if we try to combine flags that we don't know to be
1052 * safe to combine */
1053 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1054 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1055 "report this at https://jira.hpdd.intel.com/\n",
1056 p1->flag, p2->flag);
1061 return (p1->off + p1->count == p2->off);
1064 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1065 struct brw_page **pga, int opc,
1066 cksum_type_t cksum_type)
1070 struct cfs_crypto_hash_desc *hdesc;
1071 unsigned int bufsize;
1073 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1075 LASSERT(pg_count > 0);
1077 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1078 if (IS_ERR(hdesc)) {
1079 CERROR("Unable to initialize checksum hash %s\n",
1080 cfs_crypto_hash_name(cfs_alg));
1081 return PTR_ERR(hdesc);
1084 while (nob > 0 && pg_count > 0) {
1085 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1087 /* corrupt the data before we compute the checksum, to
1088 * simulate an OST->client data error */
1089 if (i == 0 && opc == OST_READ &&
1090 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1091 unsigned char *ptr = kmap(pga[i]->pg);
1092 int off = pga[i]->off & ~PAGE_MASK;
1094 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1097 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1098 pga[i]->off & ~PAGE_MASK,
1100 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1101 (int)(pga[i]->off & ~PAGE_MASK));
1103 nob -= pga[i]->count;
1108 bufsize = sizeof(cksum);
1109 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1111 /* For sending we only compute the wrong checksum instead
1112 * of corrupting the data so it is still correct on a redo */
1113 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1120 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1121 u32 page_count, struct brw_page **pga,
1122 struct ptlrpc_request **reqp, int resend)
1124 struct ptlrpc_request *req;
1125 struct ptlrpc_bulk_desc *desc;
1126 struct ost_body *body;
1127 struct obd_ioobj *ioobj;
1128 struct niobuf_remote *niobuf;
1129 int niocount, i, requested_nob, opc, rc;
1130 struct osc_brw_async_args *aa;
1131 struct req_capsule *pill;
1132 struct brw_page *pg_prev;
1135 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1136 RETURN(-ENOMEM); /* Recoverable */
1137 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1138 RETURN(-EINVAL); /* Fatal */
1140 if ((cmd & OBD_BRW_WRITE) != 0) {
1142 req = ptlrpc_request_alloc_pool(cli->cl_import,
1144 &RQF_OST_BRW_WRITE);
1147 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1152 for (niocount = i = 1; i < page_count; i++) {
1153 if (!can_merge_pages(pga[i - 1], pga[i]))
1157 pill = &req->rq_pill;
1158 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1160 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1161 niocount * sizeof(*niobuf));
1163 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1165 ptlrpc_request_free(req);
1168 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1169 ptlrpc_at_set_req_timeout(req);
1170 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1172 req->rq_no_retry_einprogress = 1;
1174 desc = ptlrpc_prep_bulk_imp(req, page_count,
1175 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1176 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1177 PTLRPC_BULK_PUT_SINK) |
1178 PTLRPC_BULK_BUF_KIOV,
1180 &ptlrpc_bulk_kiov_pin_ops);
1183 GOTO(out, rc = -ENOMEM);
1184 /* NB request now owns desc and will free it when it gets freed */
1186 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1187 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1188 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1189 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1191 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1193 obdo_to_ioobj(oa, ioobj);
1194 ioobj->ioo_bufcnt = niocount;
1195 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1196 * that might be send for this request. The actual number is decided
1197 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1198 * "max - 1" for old client compatibility sending "0", and also so the
1199 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1200 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1201 LASSERT(page_count > 0);
1203 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1204 struct brw_page *pg = pga[i];
1205 int poff = pg->off & ~PAGE_MASK;
1207 LASSERT(pg->count > 0);
1208 /* make sure there is no gap in the middle of page array */
1209 LASSERTF(page_count == 1 ||
1210 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1211 ergo(i > 0 && i < page_count - 1,
1212 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1213 ergo(i == page_count - 1, poff == 0)),
1214 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1215 i, page_count, pg, pg->off, pg->count);
1216 LASSERTF(i == 0 || pg->off > pg_prev->off,
1217 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1218 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1220 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1221 pg_prev->pg, page_private(pg_prev->pg),
1222 pg_prev->pg->index, pg_prev->off);
1223 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1224 (pg->flag & OBD_BRW_SRVLOCK));
1226 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
1227 requested_nob += pg->count;
1229 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1231 niobuf->rnb_len += pg->count;
1233 niobuf->rnb_offset = pg->off;
1234 niobuf->rnb_len = pg->count;
1235 niobuf->rnb_flags = pg->flag;
1240 LASSERTF((void *)(niobuf - niocount) ==
1241 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1242 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1243 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1245 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1247 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1248 body->oa.o_valid |= OBD_MD_FLFLAGS;
1249 body->oa.o_flags = 0;
1251 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1254 if (osc_should_shrink_grant(cli))
1255 osc_shrink_grant_local(cli, &body->oa);
1257 /* size[REQ_REC_OFF] still sizeof (*body) */
1258 if (opc == OST_WRITE) {
1259 if (cli->cl_checksum &&
1260 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1261 /* store cl_cksum_type in a local variable since
1262 * it can be changed via lprocfs */
1263 cksum_type_t cksum_type = cli->cl_cksum_type;
1265 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1266 oa->o_flags &= OBD_FL_LOCAL_MASK;
1267 body->oa.o_flags = 0;
1269 body->oa.o_flags |= cksum_type_pack(cksum_type);
1270 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1275 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1277 /* save this in 'oa', too, for later checking */
1278 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1279 oa->o_flags |= cksum_type_pack(cksum_type);
1281 /* clear out the checksum flag, in case this is a
1282 * resend but cl_checksum is no longer set. b=11238 */
1283 oa->o_valid &= ~OBD_MD_FLCKSUM;
1285 oa->o_cksum = body->oa.o_cksum;
1286 /* 1 RC per niobuf */
1287 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1288 sizeof(__u32) * niocount);
1290 if (cli->cl_checksum &&
1291 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1292 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1293 body->oa.o_flags = 0;
1294 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1295 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1298 ptlrpc_request_set_replen(req);
1300 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1301 aa = ptlrpc_req_async_args(req);
1303 aa->aa_requested_nob = requested_nob;
1304 aa->aa_nio_count = niocount;
1305 aa->aa_page_count = page_count;
1309 INIT_LIST_HEAD(&aa->aa_oaps);
1312 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1313 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1314 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1315 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1319 ptlrpc_req_finished(req);
1323 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1324 __u32 client_cksum, __u32 server_cksum, int nob,
1325 size_t page_count, struct brw_page **pga,
1326 cksum_type_t client_cksum_type)
1330 cksum_type_t cksum_type;
1332 if (server_cksum == client_cksum) {
1333 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1337 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1339 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1342 if (cksum_type != client_cksum_type)
1343 msg = "the server did not use the checksum type specified in "
1344 "the original request - likely a protocol problem";
1345 else if (new_cksum == server_cksum)
1346 msg = "changed on the client after we checksummed it - "
1347 "likely false positive due to mmap IO (bug 11742)";
1348 else if (new_cksum == client_cksum)
1349 msg = "changed in transit before arrival at OST";
1351 msg = "changed in transit AND doesn't match the original - "
1352 "likely false positive due to mmap IO (bug 11742)";
1354 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1355 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1356 msg, libcfs_nid2str(peer->nid),
1357 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1358 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1359 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1360 POSTID(&oa->o_oi), pga[0]->off,
1361 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1362 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1363 "client csum now %x\n", client_cksum, client_cksum_type,
1364 server_cksum, cksum_type, new_cksum);
1368 /* Note rc enters this function as number of bytes transferred */
1369 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1371 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1372 const lnet_process_id_t *peer =
1373 &req->rq_import->imp_connection->c_peer;
1374 struct client_obd *cli = aa->aa_cli;
1375 struct ost_body *body;
1376 u32 client_cksum = 0;
1379 if (rc < 0 && rc != -EDQUOT) {
1380 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1384 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1385 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1387 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1391 /* set/clear over quota flag for a uid/gid */
1392 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1393 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1394 unsigned int qid[LL_MAXQUOTAS] =
1395 {body->oa.o_uid, body->oa.o_gid};
1397 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1398 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1400 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1403 osc_update_grant(cli, body);
1408 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1409 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1411 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1413 CERROR("Unexpected +ve rc %d\n", rc);
1416 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1418 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1421 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1422 check_write_checksum(&body->oa, peer, client_cksum,
1423 body->oa.o_cksum, aa->aa_requested_nob,
1424 aa->aa_page_count, aa->aa_ppga,
1425 cksum_type_unpack(aa->aa_oa->o_flags)))
1428 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1429 aa->aa_page_count, aa->aa_ppga);
1433 /* The rest of this function executes only for OST_READs */
1435 /* if unwrap_bulk failed, return -EAGAIN to retry */
1436 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1438 GOTO(out, rc = -EAGAIN);
1440 if (rc > aa->aa_requested_nob) {
1441 CERROR("Unexpected rc %d (%d requested)\n", rc,
1442 aa->aa_requested_nob);
1446 if (rc != req->rq_bulk->bd_nob_transferred) {
1447 CERROR ("Unexpected rc %d (%d transferred)\n",
1448 rc, req->rq_bulk->bd_nob_transferred);
1452 if (rc < aa->aa_requested_nob)
1453 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1455 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1456 static int cksum_counter;
1457 u32 server_cksum = body->oa.o_cksum;
1460 cksum_type_t cksum_type;
1462 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1463 body->oa.o_flags : 0);
1464 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1465 aa->aa_ppga, OST_READ,
1468 if (peer->nid != req->rq_bulk->bd_sender) {
1470 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1473 if (server_cksum != client_cksum) {
1474 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1475 "%s%s%s inode "DFID" object "DOSTID
1476 " extent ["LPU64"-"LPU64"]\n",
1477 req->rq_import->imp_obd->obd_name,
1478 libcfs_nid2str(peer->nid),
1480 body->oa.o_valid & OBD_MD_FLFID ?
1481 body->oa.o_parent_seq : (__u64)0,
1482 body->oa.o_valid & OBD_MD_FLFID ?
1483 body->oa.o_parent_oid : 0,
1484 body->oa.o_valid & OBD_MD_FLFID ?
1485 body->oa.o_parent_ver : 0,
1486 POSTID(&body->oa.o_oi),
1487 aa->aa_ppga[0]->off,
1488 aa->aa_ppga[aa->aa_page_count-1]->off +
1489 aa->aa_ppga[aa->aa_page_count-1]->count -
1491 CERROR("client %x, server %x, cksum_type %x\n",
1492 client_cksum, server_cksum, cksum_type);
1494 aa->aa_oa->o_cksum = client_cksum;
1498 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1501 } else if (unlikely(client_cksum)) {
1502 static int cksum_missed;
1505 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1506 CERROR("Checksum %u requested from %s but not sent\n",
1507 cksum_missed, libcfs_nid2str(peer->nid));
1513 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1514 aa->aa_oa, &body->oa);
1519 static int osc_brw_redo_request(struct ptlrpc_request *request,
1520 struct osc_brw_async_args *aa, int rc)
1522 struct ptlrpc_request *new_req;
1523 struct osc_brw_async_args *new_aa;
1524 struct osc_async_page *oap;
1527 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1528 "redo for recoverable error %d", rc);
1530 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1531 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1532 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1533 aa->aa_ppga, &new_req, 1);
1537 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1538 if (oap->oap_request != NULL) {
1539 LASSERTF(request == oap->oap_request,
1540 "request %p != oap_request %p\n",
1541 request, oap->oap_request);
1542 if (oap->oap_interrupted) {
1543 ptlrpc_req_finished(new_req);
1548 /* New request takes over pga and oaps from old request.
1549 * Note that copying a list_head doesn't work, need to move it... */
1551 new_req->rq_interpret_reply = request->rq_interpret_reply;
1552 new_req->rq_async_args = request->rq_async_args;
1553 new_req->rq_commit_cb = request->rq_commit_cb;
1554 /* cap resend delay to the current request timeout, this is similar to
1555 * what ptlrpc does (see after_reply()) */
1556 if (aa->aa_resends > new_req->rq_timeout)
1557 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1559 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1560 new_req->rq_generation_set = 1;
1561 new_req->rq_import_generation = request->rq_import_generation;
1563 new_aa = ptlrpc_req_async_args(new_req);
1565 INIT_LIST_HEAD(&new_aa->aa_oaps);
1566 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1567 INIT_LIST_HEAD(&new_aa->aa_exts);
1568 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1569 new_aa->aa_resends = aa->aa_resends;
1571 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1572 if (oap->oap_request) {
1573 ptlrpc_req_finished(oap->oap_request);
1574 oap->oap_request = ptlrpc_request_addref(new_req);
1578 /* XXX: This code will run into problem if we're going to support
1579 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1580 * and wait for all of them to be finished. We should inherit request
1581 * set from old request. */
1582 ptlrpcd_add_req(new_req);
1584 DEBUG_REQ(D_INFO, new_req, "new request");
1589 * ugh, we want disk allocation on the target to happen in offset order. we'll
1590 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1591 * fine for our small page arrays and doesn't require allocation. its an
1592 * insertion sort that swaps elements that are strides apart, shrinking the
1593 * stride down until its '1' and the array is sorted.
1595 static void sort_brw_pages(struct brw_page **array, int num)
1598 struct brw_page *tmp;
1602 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1607 for (i = stride ; i < num ; i++) {
1610 while (j >= stride && array[j - stride]->off > tmp->off) {
1611 array[j] = array[j - stride];
1616 } while (stride > 1);
1619 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1621 LASSERT(ppga != NULL);
1622 OBD_FREE(ppga, sizeof(*ppga) * count);
1625 static int brw_interpret(const struct lu_env *env,
1626 struct ptlrpc_request *req, void *data, int rc)
1628 struct osc_brw_async_args *aa = data;
1629 struct osc_extent *ext;
1630 struct osc_extent *tmp;
1631 struct client_obd *cli = aa->aa_cli;
1634 rc = osc_brw_fini_request(req, rc);
1635 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1636 /* When server return -EINPROGRESS, client should always retry
1637 * regardless of the number of times the bulk was resent already. */
1638 if (osc_recoverable_error(rc)) {
1639 if (req->rq_import_generation !=
1640 req->rq_import->imp_generation) {
1641 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1642 ""DOSTID", rc = %d.\n",
1643 req->rq_import->imp_obd->obd_name,
1644 POSTID(&aa->aa_oa->o_oi), rc);
1645 } else if (rc == -EINPROGRESS ||
1646 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1647 rc = osc_brw_redo_request(req, aa, rc);
1649 CERROR("%s: too many resent retries for object: "
1650 ""LPU64":"LPU64", rc = %d.\n",
1651 req->rq_import->imp_obd->obd_name,
1652 POSTID(&aa->aa_oa->o_oi), rc);
1657 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1662 struct obdo *oa = aa->aa_oa;
1663 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1664 unsigned long valid = 0;
1665 struct cl_object *obj;
1666 struct osc_async_page *last;
1668 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1669 obj = osc2cl(last->oap_obj);
1671 cl_object_attr_lock(obj);
1672 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1673 attr->cat_blocks = oa->o_blocks;
1674 valid |= CAT_BLOCKS;
1676 if (oa->o_valid & OBD_MD_FLMTIME) {
1677 attr->cat_mtime = oa->o_mtime;
1680 if (oa->o_valid & OBD_MD_FLATIME) {
1681 attr->cat_atime = oa->o_atime;
1684 if (oa->o_valid & OBD_MD_FLCTIME) {
1685 attr->cat_ctime = oa->o_ctime;
1689 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1690 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1691 loff_t last_off = last->oap_count + last->oap_obj_off +
1694 /* Change file size if this is an out of quota or
1695 * direct IO write and it extends the file size */
1696 if (loi->loi_lvb.lvb_size < last_off) {
1697 attr->cat_size = last_off;
1700 /* Extend KMS if it's not a lockless write */
1701 if (loi->loi_kms < last_off &&
1702 oap2osc_page(last)->ops_srvlock == 0) {
1703 attr->cat_kms = last_off;
1709 cl_object_attr_update(env, obj, attr, valid);
1710 cl_object_attr_unlock(obj);
1712 OBDO_FREE(aa->aa_oa);
1714 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1715 osc_inc_unstable_pages(req);
1717 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1718 list_del_init(&ext->oe_link);
1719 osc_extent_finish(env, ext, 1, rc);
1721 LASSERT(list_empty(&aa->aa_exts));
1722 LASSERT(list_empty(&aa->aa_oaps));
1724 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1725 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1727 spin_lock(&cli->cl_loi_list_lock);
1728 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1729 * is called so we know whether to go to sync BRWs or wait for more
1730 * RPCs to complete */
1731 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1732 cli->cl_w_in_flight--;
1734 cli->cl_r_in_flight--;
1735 osc_wake_cache_waiters(cli);
1736 spin_unlock(&cli->cl_loi_list_lock);
1738 osc_io_unplug(env, cli, NULL);
1742 static void brw_commit(struct ptlrpc_request *req)
1744 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1745 * this called via the rq_commit_cb, I need to ensure
1746 * osc_dec_unstable_pages is still called. Otherwise unstable
1747 * pages may be leaked. */
1748 spin_lock(&req->rq_lock);
1749 if (likely(req->rq_unstable)) {
1750 req->rq_unstable = 0;
1751 spin_unlock(&req->rq_lock);
1753 osc_dec_unstable_pages(req);
1755 req->rq_committed = 1;
1756 spin_unlock(&req->rq_lock);
1761 * Build an RPC by the list of extent @ext_list. The caller must ensure
1762 * that the total pages in this list are NOT over max pages per RPC.
1763 * Extents in the list must be in OES_RPC state.
1765 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1766 struct list_head *ext_list, int cmd)
1768 struct ptlrpc_request *req = NULL;
1769 struct osc_extent *ext;
1770 struct brw_page **pga = NULL;
1771 struct osc_brw_async_args *aa = NULL;
1772 struct obdo *oa = NULL;
1773 struct osc_async_page *oap;
1774 struct osc_object *obj = NULL;
1775 struct cl_req_attr *crattr = NULL;
1776 loff_t starting_offset = OBD_OBJECT_EOF;
1777 loff_t ending_offset = 0;
1781 bool soft_sync = false;
1782 bool interrupted = false;
1786 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1787 struct ost_body *body;
1789 LASSERT(!list_empty(ext_list));
1791 /* add pages into rpc_list to build BRW rpc */
1792 list_for_each_entry(ext, ext_list, oe_link) {
1793 LASSERT(ext->oe_state == OES_RPC);
1794 mem_tight |= ext->oe_memalloc;
1795 grant += ext->oe_grants;
1796 page_count += ext->oe_nr_pages;
1801 soft_sync = osc_over_unstable_soft_limit(cli);
1803 mpflag = cfs_memory_pressure_get_and_set();
1805 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1807 GOTO(out, rc = -ENOMEM);
1811 GOTO(out, rc = -ENOMEM);
1814 list_for_each_entry(ext, ext_list, oe_link) {
1815 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1817 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1819 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1820 pga[i] = &oap->oap_brw_page;
1821 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1824 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1825 if (starting_offset == OBD_OBJECT_EOF ||
1826 starting_offset > oap->oap_obj_off)
1827 starting_offset = oap->oap_obj_off;
1829 LASSERT(oap->oap_page_off == 0);
1830 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1831 ending_offset = oap->oap_obj_off +
1834 LASSERT(oap->oap_page_off + oap->oap_count ==
1836 if (oap->oap_interrupted)
1841 /* first page in the list */
1842 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
1844 crattr = &osc_env_info(env)->oti_req_attr;
1845 memset(crattr, 0, sizeof(*crattr));
1846 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1847 crattr->cra_flags = ~0ULL;
1848 crattr->cra_page = oap2cl_page(oap);
1849 crattr->cra_oa = oa;
1850 cl_req_attr_set(env, osc2cl(obj), crattr);
1852 if (cmd == OBD_BRW_WRITE)
1853 oa->o_grant_used = grant;
1855 sort_brw_pages(pga, page_count);
1856 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
1858 CERROR("prep_req failed: %d\n", rc);
1862 req->rq_commit_cb = brw_commit;
1863 req->rq_interpret_reply = brw_interpret;
1864 req->rq_memalloc = mem_tight != 0;
1865 oap->oap_request = ptlrpc_request_addref(req);
1866 if (interrupted && !req->rq_intr)
1867 ptlrpc_mark_interrupted(req);
1869 /* Need to update the timestamps after the request is built in case
1870 * we race with setattr (locally or in queue at OST). If OST gets
1871 * later setattr before earlier BRW (as determined by the request xid),
1872 * the OST will not use BRW timestamps. Sadly, there is no obvious
1873 * way to do this in a single call. bug 10150 */
1874 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1875 crattr->cra_oa = &body->oa;
1876 crattr->cra_flags = OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME;
1877 cl_req_attr_set(env, osc2cl(obj), crattr);
1878 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1880 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1881 aa = ptlrpc_req_async_args(req);
1882 INIT_LIST_HEAD(&aa->aa_oaps);
1883 list_splice_init(&rpc_list, &aa->aa_oaps);
1884 INIT_LIST_HEAD(&aa->aa_exts);
1885 list_splice_init(ext_list, &aa->aa_exts);
1887 spin_lock(&cli->cl_loi_list_lock);
1888 starting_offset >>= PAGE_CACHE_SHIFT;
1889 if (cmd == OBD_BRW_READ) {
1890 cli->cl_r_in_flight++;
1891 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1892 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1893 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1894 starting_offset + 1);
1896 cli->cl_w_in_flight++;
1897 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1898 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1899 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1900 starting_offset + 1);
1902 spin_unlock(&cli->cl_loi_list_lock);
1904 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1905 page_count, aa, cli->cl_r_in_flight,
1906 cli->cl_w_in_flight);
1907 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, 4);
1909 ptlrpcd_add_req(req);
1915 cfs_memory_pressure_restore(mpflag);
1918 LASSERT(req == NULL);
1923 OBD_FREE(pga, sizeof(*pga) * page_count);
1924 /* this should happen rarely and is pretty bad, it makes the
1925 * pending list not follow the dirty order */
1926 while (!list_empty(ext_list)) {
1927 ext = list_entry(ext_list->next, struct osc_extent,
1929 list_del_init(&ext->oe_link);
1930 osc_extent_finish(env, ext, 0, rc);
1936 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
1940 LASSERT(lock != NULL);
1942 lock_res_and_lock(lock);
1944 if (lock->l_ast_data == NULL)
1945 lock->l_ast_data = data;
1946 if (lock->l_ast_data == data)
1949 unlock_res_and_lock(lock);
1954 static int osc_enqueue_fini(struct ptlrpc_request *req,
1955 osc_enqueue_upcall_f upcall, void *cookie,
1956 struct lustre_handle *lockh, enum ldlm_mode mode,
1957 __u64 *flags, int agl, int errcode)
1959 bool intent = *flags & LDLM_FL_HAS_INTENT;
1963 /* The request was created before ldlm_cli_enqueue call. */
1964 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1965 struct ldlm_reply *rep;
1967 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1968 LASSERT(rep != NULL);
1970 rep->lock_policy_res1 =
1971 ptlrpc_status_ntoh(rep->lock_policy_res1);
1972 if (rep->lock_policy_res1)
1973 errcode = rep->lock_policy_res1;
1975 *flags |= LDLM_FL_LVB_READY;
1976 } else if (errcode == ELDLM_OK) {
1977 *flags |= LDLM_FL_LVB_READY;
1980 /* Call the update callback. */
1981 rc = (*upcall)(cookie, lockh, errcode);
1983 /* release the reference taken in ldlm_cli_enqueue() */
1984 if (errcode == ELDLM_LOCK_MATCHED)
1986 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1987 ldlm_lock_decref(lockh, mode);
1992 static int osc_enqueue_interpret(const struct lu_env *env,
1993 struct ptlrpc_request *req,
1994 struct osc_enqueue_args *aa, int rc)
1996 struct ldlm_lock *lock;
1997 struct lustre_handle *lockh = &aa->oa_lockh;
1998 enum ldlm_mode mode = aa->oa_mode;
1999 struct ost_lvb *lvb = aa->oa_lvb;
2000 __u32 lvb_len = sizeof(*lvb);
2005 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2007 lock = ldlm_handle2lock(lockh);
2008 LASSERTF(lock != NULL,
2009 "lockh "LPX64", req %p, aa %p - client evicted?\n",
2010 lockh->cookie, req, aa);
2012 /* Take an additional reference so that a blocking AST that
2013 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2014 * to arrive after an upcall has been executed by
2015 * osc_enqueue_fini(). */
2016 ldlm_lock_addref(lockh, mode);
2018 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2019 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2021 /* Let CP AST to grant the lock first. */
2022 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2025 LASSERT(aa->oa_lvb == NULL);
2026 LASSERT(aa->oa_flags == NULL);
2027 aa->oa_flags = &flags;
2030 /* Complete obtaining the lock procedure. */
2031 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2032 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2034 /* Complete osc stuff. */
2035 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2036 aa->oa_flags, aa->oa_agl, rc);
2038 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2040 ldlm_lock_decref(lockh, mode);
2041 LDLM_LOCK_PUT(lock);
2045 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2047 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2048 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2049 * other synchronous requests, however keeping some locks and trying to obtain
2050 * others may take a considerable amount of time in a case of ost failure; and
2051 * when other sync requests do not get released lock from a client, the client
2052 * is evicted from the cluster -- such scenarious make the life difficult, so
2053 * release locks just after they are obtained. */
2054 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2055 __u64 *flags, union ldlm_policy_data *policy,
2056 struct ost_lvb *lvb, int kms_valid,
2057 osc_enqueue_upcall_f upcall, void *cookie,
2058 struct ldlm_enqueue_info *einfo,
2059 struct ptlrpc_request_set *rqset, int async, int agl)
2061 struct obd_device *obd = exp->exp_obd;
2062 struct lustre_handle lockh = { 0 };
2063 struct ptlrpc_request *req = NULL;
2064 int intent = *flags & LDLM_FL_HAS_INTENT;
2065 __u64 match_flags = *flags;
2066 enum ldlm_mode mode;
2070 /* Filesystem lock extents are extended to page boundaries so that
2071 * dealing with the page cache is a little smoother. */
2072 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2073 policy->l_extent.end |= ~PAGE_MASK;
2076 * kms is not valid when either object is completely fresh (so that no
2077 * locks are cached), or object was evicted. In the latter case cached
2078 * lock cannot be used, because it would prime inode state with
2079 * potentially stale LVB.
2084 /* Next, search for already existing extent locks that will cover us */
2085 /* If we're trying to read, we also search for an existing PW lock. The
2086 * VFS and page cache already protect us locally, so lots of readers/
2087 * writers can share a single PW lock.
2089 * There are problems with conversion deadlocks, so instead of
2090 * converting a read lock to a write lock, we'll just enqueue a new
2093 * At some point we should cancel the read lock instead of making them
2094 * send us a blocking callback, but there are problems with canceling
2095 * locks out from other users right now, too. */
2096 mode = einfo->ei_mode;
2097 if (einfo->ei_mode == LCK_PR)
2100 match_flags |= LDLM_FL_LVB_READY;
2102 match_flags |= LDLM_FL_BLOCK_GRANTED;
2103 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2104 einfo->ei_type, policy, mode, &lockh, 0);
2106 struct ldlm_lock *matched;
2108 if (*flags & LDLM_FL_TEST_LOCK)
2111 matched = ldlm_handle2lock(&lockh);
2113 /* AGL enqueues DLM locks speculatively. Therefore if
2114 * it already exists a DLM lock, it wll just inform the
2115 * caller to cancel the AGL process for this stripe. */
2116 ldlm_lock_decref(&lockh, mode);
2117 LDLM_LOCK_PUT(matched);
2119 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2120 *flags |= LDLM_FL_LVB_READY;
2122 /* We already have a lock, and it's referenced. */
2123 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2125 ldlm_lock_decref(&lockh, mode);
2126 LDLM_LOCK_PUT(matched);
2129 ldlm_lock_decref(&lockh, mode);
2130 LDLM_LOCK_PUT(matched);
2135 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2139 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2140 &RQF_LDLM_ENQUEUE_LVB);
2144 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2146 ptlrpc_request_free(req);
2150 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2152 ptlrpc_request_set_replen(req);
2155 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2156 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2158 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2159 sizeof(*lvb), LVB_T_OST, &lockh, async);
2162 struct osc_enqueue_args *aa;
2163 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2164 aa = ptlrpc_req_async_args(req);
2166 aa->oa_mode = einfo->ei_mode;
2167 aa->oa_type = einfo->ei_type;
2168 lustre_handle_copy(&aa->oa_lockh, &lockh);
2169 aa->oa_upcall = upcall;
2170 aa->oa_cookie = cookie;
2173 aa->oa_flags = flags;
2176 /* AGL is essentially to enqueue an DLM lock
2177 * in advance, so we don't care about the
2178 * result of AGL enqueue. */
2180 aa->oa_flags = NULL;
2183 req->rq_interpret_reply =
2184 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2185 if (rqset == PTLRPCD_SET)
2186 ptlrpcd_add_req(req);
2188 ptlrpc_set_add_req(rqset, req);
2189 } else if (intent) {
2190 ptlrpc_req_finished(req);
2195 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2198 ptlrpc_req_finished(req);
2203 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2204 enum ldlm_type type, union ldlm_policy_data *policy,
2205 enum ldlm_mode mode, __u64 *flags, void *data,
2206 struct lustre_handle *lockh, int unref)
2208 struct obd_device *obd = exp->exp_obd;
2209 __u64 lflags = *flags;
2213 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2216 /* Filesystem lock extents are extended to page boundaries so that
2217 * dealing with the page cache is a little smoother */
2218 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2219 policy->l_extent.end |= ~PAGE_MASK;
2221 /* Next, search for already existing extent locks that will cover us */
2222 /* If we're trying to read, we also search for an existing PW lock. The
2223 * VFS and page cache already protect us locally, so lots of readers/
2224 * writers can share a single PW lock. */
2228 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2229 res_id, type, policy, rc, lockh, unref);
2230 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2234 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2236 LASSERT(lock != NULL);
2237 if (!osc_set_lock_data(lock, data)) {
2238 ldlm_lock_decref(lockh, rc);
2241 LDLM_LOCK_PUT(lock);
2246 static int osc_statfs_interpret(const struct lu_env *env,
2247 struct ptlrpc_request *req,
2248 struct osc_async_args *aa, int rc)
2250 struct obd_statfs *msfs;
2254 /* The request has in fact never been sent
2255 * due to issues at a higher level (LOV).
2256 * Exit immediately since the caller is
2257 * aware of the problem and takes care
2258 * of the clean up */
2261 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2262 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2268 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2270 GOTO(out, rc = -EPROTO);
2273 *aa->aa_oi->oi_osfs = *msfs;
2275 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2279 static int osc_statfs_async(struct obd_export *exp,
2280 struct obd_info *oinfo, __u64 max_age,
2281 struct ptlrpc_request_set *rqset)
2283 struct obd_device *obd = class_exp2obd(exp);
2284 struct ptlrpc_request *req;
2285 struct osc_async_args *aa;
2289 /* We could possibly pass max_age in the request (as an absolute
2290 * timestamp or a "seconds.usec ago") so the target can avoid doing
2291 * extra calls into the filesystem if that isn't necessary (e.g.
2292 * during mount that would help a bit). Having relative timestamps
2293 * is not so great if request processing is slow, while absolute
2294 * timestamps are not ideal because they need time synchronization. */
2295 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2299 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2301 ptlrpc_request_free(req);
2304 ptlrpc_request_set_replen(req);
2305 req->rq_request_portal = OST_CREATE_PORTAL;
2306 ptlrpc_at_set_req_timeout(req);
2308 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2309 /* procfs requests not want stat in wait for avoid deadlock */
2310 req->rq_no_resend = 1;
2311 req->rq_no_delay = 1;
2314 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2315 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2316 aa = ptlrpc_req_async_args(req);
2319 ptlrpc_set_add_req(rqset, req);
2323 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2324 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2326 struct obd_device *obd = class_exp2obd(exp);
2327 struct obd_statfs *msfs;
2328 struct ptlrpc_request *req;
2329 struct obd_import *imp = NULL;
2333 /*Since the request might also come from lprocfs, so we need
2334 *sync this with client_disconnect_export Bug15684*/
2335 down_read(&obd->u.cli.cl_sem);
2336 if (obd->u.cli.cl_import)
2337 imp = class_import_get(obd->u.cli.cl_import);
2338 up_read(&obd->u.cli.cl_sem);
2342 /* We could possibly pass max_age in the request (as an absolute
2343 * timestamp or a "seconds.usec ago") so the target can avoid doing
2344 * extra calls into the filesystem if that isn't necessary (e.g.
2345 * during mount that would help a bit). Having relative timestamps
2346 * is not so great if request processing is slow, while absolute
2347 * timestamps are not ideal because they need time synchronization. */
2348 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2350 class_import_put(imp);
2355 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2357 ptlrpc_request_free(req);
2360 ptlrpc_request_set_replen(req);
2361 req->rq_request_portal = OST_CREATE_PORTAL;
2362 ptlrpc_at_set_req_timeout(req);
2364 if (flags & OBD_STATFS_NODELAY) {
2365 /* procfs requests not want stat in wait for avoid deadlock */
2366 req->rq_no_resend = 1;
2367 req->rq_no_delay = 1;
2370 rc = ptlrpc_queue_wait(req);
2374 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2376 GOTO(out, rc = -EPROTO);
2383 ptlrpc_req_finished(req);
2387 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2388 void *karg, void __user *uarg)
2390 struct obd_device *obd = exp->exp_obd;
2391 struct obd_ioctl_data *data = karg;
2395 if (!try_module_get(THIS_MODULE)) {
2396 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2397 module_name(THIS_MODULE));
2401 case OBD_IOC_CLIENT_RECOVER:
2402 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2403 data->ioc_inlbuf1, 0);
2407 case IOC_OSC_SET_ACTIVE:
2408 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2411 case OBD_IOC_PING_TARGET:
2412 err = ptlrpc_obd_ping(obd);
2415 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2416 cmd, current_comm());
2417 GOTO(out, err = -ENOTTY);
2420 module_put(THIS_MODULE);
2424 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2425 u32 keylen, void *key,
2426 u32 vallen, void *val,
2427 struct ptlrpc_request_set *set)
2429 struct ptlrpc_request *req;
2430 struct obd_device *obd = exp->exp_obd;
2431 struct obd_import *imp = class_exp2cliimp(exp);
2436 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2438 if (KEY_IS(KEY_CHECKSUM)) {
2439 if (vallen != sizeof(int))
2441 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2445 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2446 sptlrpc_conf_client_adapt(obd);
2450 if (KEY_IS(KEY_FLUSH_CTX)) {
2451 sptlrpc_import_flush_my_ctx(imp);
2455 if (KEY_IS(KEY_CACHE_SET)) {
2456 struct client_obd *cli = &obd->u.cli;
2458 LASSERT(cli->cl_cache == NULL); /* only once */
2459 cli->cl_cache = (struct cl_client_cache *)val;
2460 cl_cache_incref(cli->cl_cache);
2461 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2463 /* add this osc into entity list */
2464 LASSERT(list_empty(&cli->cl_lru_osc));
2465 spin_lock(&cli->cl_cache->ccc_lru_lock);
2466 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2467 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2472 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2473 struct client_obd *cli = &obd->u.cli;
2474 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2475 long target = *(long *)val;
2477 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2482 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2485 /* We pass all other commands directly to OST. Since nobody calls osc
2486 methods directly and everybody is supposed to go through LOV, we
2487 assume lov checked invalid values for us.
2488 The only recognised values so far are evict_by_nid and mds_conn.
2489 Even if something bad goes through, we'd get a -EINVAL from OST
2492 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2493 &RQF_OST_SET_GRANT_INFO :
2498 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2499 RCL_CLIENT, keylen);
2500 if (!KEY_IS(KEY_GRANT_SHRINK))
2501 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2502 RCL_CLIENT, vallen);
2503 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2505 ptlrpc_request_free(req);
2509 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2510 memcpy(tmp, key, keylen);
2511 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2514 memcpy(tmp, val, vallen);
2516 if (KEY_IS(KEY_GRANT_SHRINK)) {
2517 struct osc_grant_args *aa;
2520 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2521 aa = ptlrpc_req_async_args(req);
2524 ptlrpc_req_finished(req);
2527 *oa = ((struct ost_body *)val)->oa;
2529 req->rq_interpret_reply = osc_shrink_grant_interpret;
2532 ptlrpc_request_set_replen(req);
2533 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2534 LASSERT(set != NULL);
2535 ptlrpc_set_add_req(set, req);
2536 ptlrpc_check_set(NULL, set);
2538 ptlrpcd_add_req(req);
2544 static int osc_reconnect(const struct lu_env *env,
2545 struct obd_export *exp, struct obd_device *obd,
2546 struct obd_uuid *cluuid,
2547 struct obd_connect_data *data,
2550 struct client_obd *cli = &obd->u.cli;
2552 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2556 spin_lock(&cli->cl_loi_list_lock);
2557 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2558 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2559 grant += cli->cl_dirty_grant;
2561 grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
2562 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2563 lost_grant = cli->cl_lost_grant;
2564 cli->cl_lost_grant = 0;
2565 spin_unlock(&cli->cl_loi_list_lock);
2567 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2568 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2569 data->ocd_version, data->ocd_grant, lost_grant);
2575 static int osc_disconnect(struct obd_export *exp)
2577 struct obd_device *obd = class_exp2obd(exp);
2580 rc = client_disconnect_export(exp);
2582 * Initially we put del_shrink_grant before disconnect_export, but it
2583 * causes the following problem if setup (connect) and cleanup
2584 * (disconnect) are tangled together.
2585 * connect p1 disconnect p2
2586 * ptlrpc_connect_import
2587 * ............... class_manual_cleanup
2590 * ptlrpc_connect_interrupt
2592 * add this client to shrink list
2594 * Bang! pinger trigger the shrink.
2595 * So the osc should be disconnected from the shrink list, after we
2596 * are sure the import has been destroyed. BUG18662
2598 if (obd->u.cli.cl_import == NULL)
2599 osc_del_shrink_grant(&obd->u.cli);
2603 static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
2604 struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
2606 struct lu_env *env = arg;
2607 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2608 struct ldlm_lock *lock;
2609 struct osc_object *osc = NULL;
2613 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2614 if (lock->l_ast_data != NULL && osc == NULL) {
2615 osc = lock->l_ast_data;
2616 cl_object_get(osc2cl(osc));
2619 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2620 * by the 2nd round of ldlm_namespace_clean() call in
2621 * osc_import_event(). */
2622 ldlm_clear_cleaned(lock);
2627 osc_object_invalidate(env, osc);
2628 cl_object_put(env, osc2cl(osc));
2634 static int osc_import_event(struct obd_device *obd,
2635 struct obd_import *imp,
2636 enum obd_import_event event)
2638 struct client_obd *cli;
2642 LASSERT(imp->imp_obd == obd);
2645 case IMP_EVENT_DISCON: {
2647 spin_lock(&cli->cl_loi_list_lock);
2648 cli->cl_avail_grant = 0;
2649 cli->cl_lost_grant = 0;
2650 spin_unlock(&cli->cl_loi_list_lock);
2653 case IMP_EVENT_INACTIVE: {
2654 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2657 case IMP_EVENT_INVALIDATE: {
2658 struct ldlm_namespace *ns = obd->obd_namespace;
2662 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2664 env = cl_env_get(&refcheck);
2666 osc_io_unplug(env, &obd->u.cli, NULL);
2668 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2669 osc_ldlm_resource_invalidate,
2671 cl_env_put(env, &refcheck);
2673 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2678 case IMP_EVENT_ACTIVE: {
2679 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2682 case IMP_EVENT_OCD: {
2683 struct obd_connect_data *ocd = &imp->imp_connect_data;
2685 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2686 osc_init_grant(&obd->u.cli, ocd);
2689 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2690 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2692 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2695 case IMP_EVENT_DEACTIVATE: {
2696 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2699 case IMP_EVENT_ACTIVATE: {
2700 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2704 CERROR("Unknown import event %d\n", event);
2711 * Determine whether the lock can be canceled before replaying the lock
2712 * during recovery, see bug16774 for detailed information.
2714 * \retval zero the lock can't be canceled
2715 * \retval other ok to cancel
2717 static int osc_cancel_weight(struct ldlm_lock *lock)
2720 * Cancel all unused and granted extent lock.
2722 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2723 lock->l_granted_mode == lock->l_req_mode &&
2724 osc_ldlm_weigh_ast(lock) == 0)
2730 static int brw_queue_work(const struct lu_env *env, void *data)
2732 struct client_obd *cli = data;
2734 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2736 osc_io_unplug(env, cli, NULL);
2740 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2742 struct client_obd *cli = &obd->u.cli;
2743 struct obd_type *type;
2751 rc = ptlrpcd_addref();
2755 rc = client_obd_setup(obd, lcfg);
2757 GOTO(out_ptlrpcd, rc);
2759 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2760 if (IS_ERR(handler))
2761 GOTO(out_client_setup, rc = PTR_ERR(handler));
2762 cli->cl_writeback_work = handler;
2764 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2765 if (IS_ERR(handler))
2766 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2767 cli->cl_lru_work = handler;
2769 rc = osc_quota_setup(obd);
2771 GOTO(out_ptlrpcd_work, rc);
2773 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2775 #ifdef CONFIG_PROC_FS
2776 obd->obd_vars = lprocfs_osc_obd_vars;
2778 /* If this is true then both client (osc) and server (osp) are on the
2779 * same node. The osp layer if loaded first will register the osc proc
2780 * directory. In that case this obd_device will be attached its proc
2781 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2782 type = class_search_type(LUSTRE_OSP_NAME);
2783 if (type && type->typ_procsym) {
2784 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2786 obd->obd_vars, obd);
2787 if (IS_ERR(obd->obd_proc_entry)) {
2788 rc = PTR_ERR(obd->obd_proc_entry);
2789 CERROR("error %d setting up lprocfs for %s\n", rc,
2791 obd->obd_proc_entry = NULL;
2794 rc = lprocfs_obd_setup(obd);
2797 /* If the basic OSC proc tree construction succeeded then
2798 * lets do the rest. */
2800 lproc_osc_attach_seqstat(obd);
2801 sptlrpc_lprocfs_cliobd_attach(obd);
2802 ptlrpc_lprocfs_register_obd(obd);
2806 * We try to control the total number of requests with a upper limit
2807 * osc_reqpool_maxreqcount. There might be some race which will cause
2808 * over-limit allocation, but it is fine.
2810 req_count = atomic_read(&osc_pool_req_count);
2811 if (req_count < osc_reqpool_maxreqcount) {
2812 adding = cli->cl_max_rpcs_in_flight + 2;
2813 if (req_count + adding > osc_reqpool_maxreqcount)
2814 adding = osc_reqpool_maxreqcount - req_count;
2816 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
2817 atomic_add(added, &osc_pool_req_count);
2820 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2821 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2823 spin_lock(&osc_shrink_lock);
2824 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
2825 spin_unlock(&osc_shrink_lock);
2830 if (cli->cl_writeback_work != NULL) {
2831 ptlrpcd_destroy_work(cli->cl_writeback_work);
2832 cli->cl_writeback_work = NULL;
2834 if (cli->cl_lru_work != NULL) {
2835 ptlrpcd_destroy_work(cli->cl_lru_work);
2836 cli->cl_lru_work = NULL;
2839 client_obd_cleanup(obd);
2845 static int osc_precleanup(struct obd_device *obd)
2847 struct client_obd *cli = &obd->u.cli;
2851 * for echo client, export may be on zombie list, wait for
2852 * zombie thread to cull it, because cli.cl_import will be
2853 * cleared in client_disconnect_export():
2854 * class_export_destroy() -> obd_cleanup() ->
2855 * echo_device_free() -> echo_client_cleanup() ->
2856 * obd_disconnect() -> osc_disconnect() ->
2857 * client_disconnect_export()
2859 obd_zombie_barrier();
2860 if (cli->cl_writeback_work) {
2861 ptlrpcd_destroy_work(cli->cl_writeback_work);
2862 cli->cl_writeback_work = NULL;
2865 if (cli->cl_lru_work) {
2866 ptlrpcd_destroy_work(cli->cl_lru_work);
2867 cli->cl_lru_work = NULL;
2870 obd_cleanup_client_import(obd);
2871 ptlrpc_lprocfs_unregister_obd(obd);
2872 lprocfs_obd_cleanup(obd);
2876 int osc_cleanup(struct obd_device *obd)
2878 struct client_obd *cli = &obd->u.cli;
2883 spin_lock(&osc_shrink_lock);
2884 list_del(&cli->cl_shrink_list);
2885 spin_unlock(&osc_shrink_lock);
2888 if (cli->cl_cache != NULL) {
2889 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2890 spin_lock(&cli->cl_cache->ccc_lru_lock);
2891 list_del_init(&cli->cl_lru_osc);
2892 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2893 cli->cl_lru_left = NULL;
2894 cl_cache_decref(cli->cl_cache);
2895 cli->cl_cache = NULL;
2898 /* free memory of osc quota cache */
2899 osc_quota_cleanup(obd);
2901 rc = client_obd_cleanup(obd);
2907 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2909 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2910 return rc > 0 ? 0: rc;
2913 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2915 return osc_process_config_base(obd, buf);
2918 static struct obd_ops osc_obd_ops = {
2919 .o_owner = THIS_MODULE,
2920 .o_setup = osc_setup,
2921 .o_precleanup = osc_precleanup,
2922 .o_cleanup = osc_cleanup,
2923 .o_add_conn = client_import_add_conn,
2924 .o_del_conn = client_import_del_conn,
2925 .o_connect = client_connect_import,
2926 .o_reconnect = osc_reconnect,
2927 .o_disconnect = osc_disconnect,
2928 .o_statfs = osc_statfs,
2929 .o_statfs_async = osc_statfs_async,
2930 .o_create = osc_create,
2931 .o_destroy = osc_destroy,
2932 .o_getattr = osc_getattr,
2933 .o_setattr = osc_setattr,
2934 .o_iocontrol = osc_iocontrol,
2935 .o_set_info_async = osc_set_info_async,
2936 .o_import_event = osc_import_event,
2937 .o_process_config = osc_process_config,
2938 .o_quotactl = osc_quotactl,
2941 static struct shrinker *osc_cache_shrinker;
2942 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
2943 DEFINE_SPINLOCK(osc_shrink_lock);
2945 #ifndef HAVE_SHRINKER_COUNT
2946 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
2948 struct shrink_control scv = {
2949 .nr_to_scan = shrink_param(sc, nr_to_scan),
2950 .gfp_mask = shrink_param(sc, gfp_mask)
2952 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
2953 struct shrinker *shrinker = NULL;
2956 (void)osc_cache_shrink_scan(shrinker, &scv);
2958 return osc_cache_shrink_count(shrinker, &scv);
2962 static int __init osc_init(void)
2964 bool enable_proc = true;
2965 struct obd_type *type;
2966 unsigned int reqpool_size;
2967 unsigned int reqsize;
2969 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
2970 osc_cache_shrink_count, osc_cache_shrink_scan);
2973 /* print an address of _any_ initialized kernel symbol from this
2974 * module, to allow debugging with gdb that doesn't support data
2975 * symbols from modules.*/
2976 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2978 rc = lu_kmem_init(osc_caches);
2982 type = class_search_type(LUSTRE_OSP_NAME);
2983 if (type != NULL && type->typ_procsym != NULL)
2984 enable_proc = false;
2986 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2987 LUSTRE_OSC_NAME, &osc_device_type);
2991 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
2993 /* This is obviously too much memory, only prevent overflow here */
2994 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
2995 GOTO(out_type, rc = -EINVAL);
2997 reqpool_size = osc_reqpool_mem_max << 20;
3000 while (reqsize < OST_IO_MAXREQSIZE)
3001 reqsize = reqsize << 1;
3004 * We don't enlarge the request count in OSC pool according to
3005 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3006 * tried after normal allocation failed. So a small OSC pool won't
3007 * cause much performance degression in most of cases.
3009 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3011 atomic_set(&osc_pool_req_count, 0);
3012 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3013 ptlrpc_add_rqs_to_pool);
3015 if (osc_rq_pool != NULL)
3019 class_unregister_type(LUSTRE_OSC_NAME);
3021 lu_kmem_fini(osc_caches);
3026 static void __exit osc_exit(void)
3028 remove_shrinker(osc_cache_shrinker);
3029 class_unregister_type(LUSTRE_OSC_NAME);
3030 lu_kmem_fini(osc_caches);
3031 ptlrpc_free_rq_pool(osc_rq_pool);
3034 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3035 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3036 MODULE_VERSION(LUSTRE_VERSION_STRING);
3037 MODULE_LICENSE("GPL");
3039 module_init(osc_init);
3040 module_exit(osc_exit);