4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include <libcfs/libcfs.h>
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
55 struct osc_brw_async_args {
61 struct brw_page **aa_ppga;
62 struct client_obd *aa_cli;
63 struct list_head aa_oaps;
64 struct list_head aa_exts;
65 struct obd_capa *aa_ocapa;
66 struct cl_req *aa_clerq;
69 #define osc_grant_args osc_brw_async_args
71 struct osc_setattr_args {
73 obd_enqueue_update_f sa_upcall;
77 struct osc_fsync_args {
78 struct obd_info *fa_oi;
79 obd_enqueue_update_f fa_upcall;
83 struct osc_enqueue_args {
84 struct obd_export *oa_exp;
88 osc_enqueue_upcall_f oa_upcall;
90 struct ost_lvb *oa_lvb;
91 struct lustre_handle oa_lockh;
92 unsigned int oa_agl:1;
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100 struct ost_body *body, void *capa)
102 struct obd_capa *oc = (struct obd_capa *)capa;
103 struct lustre_capa *c;
108 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
111 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112 DEBUG_CAPA(D_SEC, c, "pack");
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
117 struct ost_body *body;
119 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
122 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
124 osc_pack_capa(req, body, oinfo->oi_capa);
127 void osc_set_capa_size(struct ptlrpc_request *req,
128 const struct req_msg_field *field,
132 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
134 /* it is already calculated as sizeof struct obd_capa */
138 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
139 struct obd_info *oinfo)
141 struct ptlrpc_request *req;
142 struct ost_body *body;
146 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
150 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
151 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
153 ptlrpc_request_free(req);
157 osc_pack_req_body(req, oinfo);
159 ptlrpc_request_set_replen(req);
161 rc = ptlrpc_queue_wait(req);
165 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
167 GOTO(out, rc = -EPROTO);
169 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
170 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
173 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
174 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
178 ptlrpc_req_finished(req);
182 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
183 struct obd_info *oinfo)
185 struct ptlrpc_request *req;
186 struct ost_body *body;
190 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
192 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
196 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
197 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
199 ptlrpc_request_free(req);
203 osc_pack_req_body(req, oinfo);
205 ptlrpc_request_set_replen(req);
207 rc = ptlrpc_queue_wait(req);
211 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
213 GOTO(out, rc = -EPROTO);
215 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
220 ptlrpc_req_finished(req);
224 static int osc_setattr_interpret(const struct lu_env *env,
225 struct ptlrpc_request *req,
226 struct osc_setattr_args *sa, int rc)
228 struct ost_body *body;
234 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
236 GOTO(out, rc = -EPROTO);
238 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
241 rc = sa->sa_upcall(sa->sa_cookie, rc);
245 int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
246 obd_enqueue_update_f upcall, void *cookie,
247 struct ptlrpc_request_set *rqset)
249 struct ptlrpc_request *req;
250 struct osc_setattr_args *sa;
254 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
258 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
259 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
261 ptlrpc_request_free(req);
265 osc_pack_req_body(req, oinfo);
267 ptlrpc_request_set_replen(req);
269 /* do mds to ost setattr asynchronously */
271 /* Do not wait for response. */
272 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
274 req->rq_interpret_reply =
275 (ptlrpc_interpterer_t)osc_setattr_interpret;
277 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
278 sa = ptlrpc_req_async_args(req);
279 sa->sa_oa = oinfo->oi_oa;
280 sa->sa_upcall = upcall;
281 sa->sa_cookie = cookie;
283 if (rqset == PTLRPCD_SET)
284 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
286 ptlrpc_set_add_req(rqset, req);
292 static int osc_create(const struct lu_env *env, struct obd_export *exp,
295 struct ptlrpc_request *req;
296 struct ost_body *body;
301 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
302 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
304 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
306 GOTO(out, rc = -ENOMEM);
308 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
310 ptlrpc_request_free(req);
314 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
317 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
319 ptlrpc_request_set_replen(req);
321 rc = ptlrpc_queue_wait(req);
325 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
327 GOTO(out_req, rc = -EPROTO);
329 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
330 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
332 oa->o_blksize = cli_brw_size(exp->exp_obd);
333 oa->o_valid |= OBD_MD_FLBLKSZ;
335 CDEBUG(D_HA, "transno: "LPD64"\n",
336 lustre_msg_get_transno(req->rq_repmsg));
338 ptlrpc_req_finished(req);
343 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
344 obd_enqueue_update_f upcall, void *cookie,
345 struct ptlrpc_request_set *rqset)
347 struct ptlrpc_request *req;
348 struct osc_setattr_args *sa;
349 struct ost_body *body;
353 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
357 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
358 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
360 ptlrpc_request_free(req);
363 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
364 ptlrpc_at_set_req_timeout(req);
366 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
368 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
370 osc_pack_capa(req, body, oinfo->oi_capa);
372 ptlrpc_request_set_replen(req);
374 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
375 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
376 sa = ptlrpc_req_async_args(req);
377 sa->sa_oa = oinfo->oi_oa;
378 sa->sa_upcall = upcall;
379 sa->sa_cookie = cookie;
380 if (rqset == PTLRPCD_SET)
381 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
383 ptlrpc_set_add_req(rqset, req);
388 static int osc_sync_interpret(const struct lu_env *env,
389 struct ptlrpc_request *req,
392 struct osc_fsync_args *fa = arg;
393 struct ost_body *body;
399 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
401 CERROR ("can't unpack ost_body\n");
402 GOTO(out, rc = -EPROTO);
405 *fa->fa_oi->oi_oa = body->oa;
407 rc = fa->fa_upcall(fa->fa_cookie, rc);
411 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
412 obd_enqueue_update_f upcall, void *cookie,
413 struct ptlrpc_request_set *rqset)
415 struct ptlrpc_request *req;
416 struct ost_body *body;
417 struct osc_fsync_args *fa;
421 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
425 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
426 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
428 ptlrpc_request_free(req);
432 /* overload the size and blocks fields in the oa with start/end */
433 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
435 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
437 osc_pack_capa(req, body, oinfo->oi_capa);
439 ptlrpc_request_set_replen(req);
440 req->rq_interpret_reply = osc_sync_interpret;
442 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
443 fa = ptlrpc_req_async_args(req);
445 fa->fa_upcall = upcall;
446 fa->fa_cookie = cookie;
448 if (rqset == PTLRPCD_SET)
449 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
451 ptlrpc_set_add_req(rqset, req);
456 /* Find and cancel locally locks matched by @mode in the resource found by
457 * @objid. Found locks are added into @cancel list. Returns the amount of
458 * locks added to @cancels list. */
459 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
460 struct list_head *cancels,
461 ldlm_mode_t mode, __u64 lock_flags)
463 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
464 struct ldlm_res_id res_id;
465 struct ldlm_resource *res;
469 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
470 * export) but disabled through procfs (flag in NS).
472 * This distinguishes from a case when ELC is not supported originally,
473 * when we still want to cancel locks in advance and just cancel them
474 * locally, without sending any RPC. */
475 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
478 ostid_build_res_name(&oa->o_oi, &res_id);
479 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
483 LDLM_RESOURCE_ADDREF(res);
484 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
485 lock_flags, 0, NULL);
486 LDLM_RESOURCE_DELREF(res);
487 ldlm_resource_putref(res);
491 static int osc_destroy_interpret(const struct lu_env *env,
492 struct ptlrpc_request *req, void *data,
495 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
497 atomic_dec(&cli->cl_destroy_in_flight);
498 wake_up(&cli->cl_destroy_waitq);
502 static int osc_can_send_destroy(struct client_obd *cli)
504 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
505 cli->cl_max_rpcs_in_flight) {
506 /* The destroy request can be sent */
509 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
510 cli->cl_max_rpcs_in_flight) {
512 * The counter has been modified between the two atomic
515 wake_up(&cli->cl_destroy_waitq);
520 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
523 struct client_obd *cli = &exp->exp_obd->u.cli;
524 struct ptlrpc_request *req;
525 struct ost_body *body;
526 struct list_head cancels = LIST_HEAD_INIT(cancels);
531 CDEBUG(D_INFO, "oa NULL\n");
535 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
536 LDLM_FL_DISCARD_DATA);
538 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
540 ldlm_lock_list_put(&cancels, l_bl_ast, count);
544 osc_set_capa_size(req, &RMF_CAPA1, NULL);
545 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
548 ptlrpc_request_free(req);
552 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
553 ptlrpc_at_set_req_timeout(req);
555 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
557 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
559 ptlrpc_request_set_replen(req);
561 req->rq_interpret_reply = osc_destroy_interpret;
562 if (!osc_can_send_destroy(cli)) {
563 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
566 * Wait until the number of on-going destroy RPCs drops
567 * under max_rpc_in_flight
569 l_wait_event_exclusive(cli->cl_destroy_waitq,
570 osc_can_send_destroy(cli), &lwi);
573 /* Do not wait for response */
574 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
578 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
581 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
583 LASSERT(!(oa->o_valid & bits));
586 spin_lock(&cli->cl_loi_list_lock);
587 oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
588 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
589 cli->cl_dirty_max_pages)) {
590 CERROR("dirty %lu - %lu > dirty_max %lu\n",
591 cli->cl_dirty_pages, cli->cl_dirty_transit,
592 cli->cl_dirty_max_pages);
594 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
595 atomic_long_read(&obd_dirty_transit_pages) >
596 (obd_max_dirty_pages + 1))) {
597 /* The atomic_read() allowing the atomic_inc() are
598 * not covered by a lock thus they may safely race and trip
599 * this CERROR() unless we add in a small fudge factor (+1). */
600 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
601 cli->cl_import->imp_obd->obd_name,
602 atomic_long_read(&obd_dirty_pages),
603 atomic_long_read(&obd_dirty_transit_pages),
604 obd_max_dirty_pages);
606 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
608 CERROR("dirty %lu - dirty_max %lu too big???\n",
609 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
612 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
614 (cli->cl_max_rpcs_in_flight + 1);
615 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
618 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
619 oa->o_dropped = cli->cl_lost_grant;
620 cli->cl_lost_grant = 0;
621 spin_unlock(&cli->cl_loi_list_lock);
622 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
623 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
627 void osc_update_next_shrink(struct client_obd *cli)
629 cli->cl_next_shrink_grant =
630 cfs_time_shift(cli->cl_grant_shrink_interval);
631 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
632 cli->cl_next_shrink_grant);
635 static void __osc_update_grant(struct client_obd *cli, u64 grant)
637 spin_lock(&cli->cl_loi_list_lock);
638 cli->cl_avail_grant += grant;
639 spin_unlock(&cli->cl_loi_list_lock);
642 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
644 if (body->oa.o_valid & OBD_MD_FLGRANT) {
645 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
646 __osc_update_grant(cli, body->oa.o_grant);
650 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
651 u32 keylen, void *key,
652 u32 vallen, void *val,
653 struct ptlrpc_request_set *set);
655 static int osc_shrink_grant_interpret(const struct lu_env *env,
656 struct ptlrpc_request *req,
659 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
660 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
661 struct ost_body *body;
664 __osc_update_grant(cli, oa->o_grant);
668 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
670 osc_update_grant(cli, body);
676 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
678 spin_lock(&cli->cl_loi_list_lock);
679 oa->o_grant = cli->cl_avail_grant / 4;
680 cli->cl_avail_grant -= oa->o_grant;
681 spin_unlock(&cli->cl_loi_list_lock);
682 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
683 oa->o_valid |= OBD_MD_FLFLAGS;
686 oa->o_flags |= OBD_FL_SHRINK_GRANT;
687 osc_update_next_shrink(cli);
690 /* Shrink the current grant, either from some large amount to enough for a
691 * full set of in-flight RPCs, or if we have already shrunk to that limit
692 * then to enough for a single RPC. This avoids keeping more grant than
693 * needed, and avoids shrinking the grant piecemeal. */
694 static int osc_shrink_grant(struct client_obd *cli)
696 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
697 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
699 spin_lock(&cli->cl_loi_list_lock);
700 if (cli->cl_avail_grant <= target_bytes)
701 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
702 spin_unlock(&cli->cl_loi_list_lock);
704 return osc_shrink_grant_to_target(cli, target_bytes);
707 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
710 struct ost_body *body;
713 spin_lock(&cli->cl_loi_list_lock);
714 /* Don't shrink if we are already above or below the desired limit
715 * We don't want to shrink below a single RPC, as that will negatively
716 * impact block allocation and long-term performance. */
717 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
718 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
720 if (target_bytes >= cli->cl_avail_grant) {
721 spin_unlock(&cli->cl_loi_list_lock);
724 spin_unlock(&cli->cl_loi_list_lock);
730 osc_announce_cached(cli, &body->oa, 0);
732 spin_lock(&cli->cl_loi_list_lock);
733 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
734 cli->cl_avail_grant = target_bytes;
735 spin_unlock(&cli->cl_loi_list_lock);
736 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
737 body->oa.o_valid |= OBD_MD_FLFLAGS;
738 body->oa.o_flags = 0;
740 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
741 osc_update_next_shrink(cli);
743 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
744 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
745 sizeof(*body), body, NULL);
747 __osc_update_grant(cli, body->oa.o_grant);
752 static int osc_should_shrink_grant(struct client_obd *client)
754 cfs_time_t time = cfs_time_current();
755 cfs_time_t next_shrink = client->cl_next_shrink_grant;
757 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
758 OBD_CONNECT_GRANT_SHRINK) == 0)
761 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
762 /* Get the current RPC size directly, instead of going via:
763 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
764 * Keep comment here so that it can be found by searching. */
765 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
767 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
768 client->cl_avail_grant > brw_size)
771 osc_update_next_shrink(client);
776 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
778 struct client_obd *client;
780 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
781 if (osc_should_shrink_grant(client))
782 osc_shrink_grant(client);
787 static int osc_add_shrink_grant(struct client_obd *client)
791 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
793 osc_grant_shrink_grant_cb, NULL,
794 &client->cl_grant_shrink_list);
796 CERROR("add grant client %s error %d\n",
797 client->cl_import->imp_obd->obd_name, rc);
800 CDEBUG(D_CACHE, "add grant client %s \n",
801 client->cl_import->imp_obd->obd_name);
802 osc_update_next_shrink(client);
806 static int osc_del_shrink_grant(struct client_obd *client)
808 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
812 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
815 * ocd_grant is the total grant amount we're expect to hold: if we've
816 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
817 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
820 * race is tolerable here: if we're evicted, but imp_state already
821 * left EVICTED state, then cl_dirty_pages must be 0 already.
823 spin_lock(&cli->cl_loi_list_lock);
824 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
825 cli->cl_avail_grant = ocd->ocd_grant;
827 cli->cl_avail_grant = ocd->ocd_grant -
828 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
830 if (cli->cl_avail_grant < 0) {
831 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
832 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
833 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
834 /* workaround for servers which do not have the patch from
836 cli->cl_avail_grant = ocd->ocd_grant;
839 /* determine the appropriate chunk size used by osc_extent. */
840 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
841 spin_unlock(&cli->cl_loi_list_lock);
843 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
844 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
845 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
847 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
848 list_empty(&cli->cl_grant_shrink_list))
849 osc_add_shrink_grant(cli);
852 /* We assume that the reason this OSC got a short read is because it read
853 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
854 * via the LOV, and it _knows_ it's reading inside the file, it's just that
855 * this stripe never got written at or beyond this stripe offset yet. */
856 static void handle_short_read(int nob_read, size_t page_count,
857 struct brw_page **pga)
862 /* skip bytes read OK */
863 while (nob_read > 0) {
864 LASSERT (page_count > 0);
866 if (pga[i]->count > nob_read) {
867 /* EOF inside this page */
868 ptr = kmap(pga[i]->pg) +
869 (pga[i]->off & ~PAGE_MASK);
870 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
877 nob_read -= pga[i]->count;
882 /* zero remaining pages */
883 while (page_count-- > 0) {
884 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
885 memset(ptr, 0, pga[i]->count);
891 static int check_write_rcs(struct ptlrpc_request *req,
892 int requested_nob, int niocount,
893 size_t page_count, struct brw_page **pga)
898 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
899 sizeof(*remote_rcs) *
901 if (remote_rcs == NULL) {
902 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
906 /* return error if any niobuf was in error */
907 for (i = 0; i < niocount; i++) {
908 if ((int)remote_rcs[i] < 0)
909 return(remote_rcs[i]);
911 if (remote_rcs[i] != 0) {
912 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
913 i, remote_rcs[i], req);
918 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
919 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
920 req->rq_bulk->bd_nob_transferred, requested_nob);
927 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
929 if (p1->flag != p2->flag) {
930 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
931 OBD_BRW_SYNC | OBD_BRW_ASYNC |
932 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
934 /* warn if we try to combine flags that we don't know to be
936 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
937 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
938 "report this at https://jira.hpdd.intel.com/\n",
944 return (p1->off + p1->count == p2->off);
947 static u32 osc_checksum_bulk(int nob, size_t pg_count,
948 struct brw_page **pga, int opc,
949 cksum_type_t cksum_type)
953 struct cfs_crypto_hash_desc *hdesc;
954 unsigned int bufsize;
956 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
958 LASSERT(pg_count > 0);
960 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
962 CERROR("Unable to initialize checksum hash %s\n",
963 cfs_crypto_hash_name(cfs_alg));
964 return PTR_ERR(hdesc);
967 while (nob > 0 && pg_count > 0) {
968 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
970 /* corrupt the data before we compute the checksum, to
971 * simulate an OST->client data error */
972 if (i == 0 && opc == OST_READ &&
973 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
974 unsigned char *ptr = kmap(pga[i]->pg);
975 int off = pga[i]->off & ~PAGE_MASK;
977 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
980 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
981 pga[i]->off & ~PAGE_MASK,
983 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
984 (int)(pga[i]->off & ~PAGE_MASK));
986 nob -= pga[i]->count;
991 bufsize = sizeof(cksum);
992 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
994 /* For sending we only compute the wrong checksum instead
995 * of corrupting the data so it is still correct on a redo */
996 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1003 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1004 u32 page_count, struct brw_page **pga,
1005 struct ptlrpc_request **reqp, struct obd_capa *ocapa,
1006 int reserve, int resend)
1008 struct ptlrpc_request *req;
1009 struct ptlrpc_bulk_desc *desc;
1010 struct ost_body *body;
1011 struct obd_ioobj *ioobj;
1012 struct niobuf_remote *niobuf;
1013 int niocount, i, requested_nob, opc, rc;
1014 struct osc_brw_async_args *aa;
1015 struct req_capsule *pill;
1016 struct brw_page *pg_prev;
1019 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1020 RETURN(-ENOMEM); /* Recoverable */
1021 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1022 RETURN(-EINVAL); /* Fatal */
1024 if ((cmd & OBD_BRW_WRITE) != 0) {
1026 req = ptlrpc_request_alloc_pool(cli->cl_import,
1027 cli->cl_import->imp_rq_pool,
1028 &RQF_OST_BRW_WRITE);
1031 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1036 for (niocount = i = 1; i < page_count; i++) {
1037 if (!can_merge_pages(pga[i - 1], pga[i]))
1041 pill = &req->rq_pill;
1042 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1044 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1045 niocount * sizeof(*niobuf));
1046 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1048 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1050 ptlrpc_request_free(req);
1053 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1054 ptlrpc_at_set_req_timeout(req);
1055 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1057 req->rq_no_retry_einprogress = 1;
1059 desc = ptlrpc_prep_bulk_imp(req, page_count,
1060 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1061 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1065 GOTO(out, rc = -ENOMEM);
1066 /* NB request now owns desc and will free it when it gets freed */
1068 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1069 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1070 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1071 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1073 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1075 obdo_to_ioobj(oa, ioobj);
1076 ioobj->ioo_bufcnt = niocount;
1077 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1078 * that might be send for this request. The actual number is decided
1079 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1080 * "max - 1" for old client compatibility sending "0", and also so the
1081 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1082 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1083 osc_pack_capa(req, body, ocapa);
1084 LASSERT(page_count > 0);
1086 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1087 struct brw_page *pg = pga[i];
1088 int poff = pg->off & ~PAGE_MASK;
1090 LASSERT(pg->count > 0);
1091 /* make sure there is no gap in the middle of page array */
1092 LASSERTF(page_count == 1 ||
1093 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1094 ergo(i > 0 && i < page_count - 1,
1095 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1096 ergo(i == page_count - 1, poff == 0)),
1097 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1098 i, page_count, pg, pg->off, pg->count);
1099 LASSERTF(i == 0 || pg->off > pg_prev->off,
1100 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1101 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1103 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1104 pg_prev->pg, page_private(pg_prev->pg),
1105 pg_prev->pg->index, pg_prev->off);
1106 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1107 (pg->flag & OBD_BRW_SRVLOCK));
1109 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1110 requested_nob += pg->count;
1112 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1114 niobuf->rnb_len += pg->count;
1116 niobuf->rnb_offset = pg->off;
1117 niobuf->rnb_len = pg->count;
1118 niobuf->rnb_flags = pg->flag;
1123 LASSERTF((void *)(niobuf - niocount) ==
1124 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1125 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1126 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1128 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1130 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1131 body->oa.o_valid |= OBD_MD_FLFLAGS;
1132 body->oa.o_flags = 0;
1134 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1137 if (osc_should_shrink_grant(cli))
1138 osc_shrink_grant_local(cli, &body->oa);
1140 /* size[REQ_REC_OFF] still sizeof (*body) */
1141 if (opc == OST_WRITE) {
1142 if (cli->cl_checksum &&
1143 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1144 /* store cl_cksum_type in a local variable since
1145 * it can be changed via lprocfs */
1146 cksum_type_t cksum_type = cli->cl_cksum_type;
1148 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1149 oa->o_flags &= OBD_FL_LOCAL_MASK;
1150 body->oa.o_flags = 0;
1152 body->oa.o_flags |= cksum_type_pack(cksum_type);
1153 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1154 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1158 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1160 /* save this in 'oa', too, for later checking */
1161 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1162 oa->o_flags |= cksum_type_pack(cksum_type);
1164 /* clear out the checksum flag, in case this is a
1165 * resend but cl_checksum is no longer set. b=11238 */
1166 oa->o_valid &= ~OBD_MD_FLCKSUM;
1168 oa->o_cksum = body->oa.o_cksum;
1169 /* 1 RC per niobuf */
1170 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1171 sizeof(__u32) * niocount);
1173 if (cli->cl_checksum &&
1174 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1175 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1176 body->oa.o_flags = 0;
1177 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1178 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1181 ptlrpc_request_set_replen(req);
1183 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1184 aa = ptlrpc_req_async_args(req);
1186 aa->aa_requested_nob = requested_nob;
1187 aa->aa_nio_count = niocount;
1188 aa->aa_page_count = page_count;
1192 INIT_LIST_HEAD(&aa->aa_oaps);
1193 if (ocapa && reserve)
1194 aa->aa_ocapa = capa_get(ocapa);
1197 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1198 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1199 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1200 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1204 ptlrpc_req_finished(req);
1208 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1209 __u32 client_cksum, __u32 server_cksum, int nob,
1210 size_t page_count, struct brw_page **pga,
1211 cksum_type_t client_cksum_type)
1215 cksum_type_t cksum_type;
1217 if (server_cksum == client_cksum) {
1218 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1222 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1224 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1227 if (cksum_type != client_cksum_type)
1228 msg = "the server did not use the checksum type specified in "
1229 "the original request - likely a protocol problem";
1230 else if (new_cksum == server_cksum)
1231 msg = "changed on the client after we checksummed it - "
1232 "likely false positive due to mmap IO (bug 11742)";
1233 else if (new_cksum == client_cksum)
1234 msg = "changed in transit before arrival at OST";
1236 msg = "changed in transit AND doesn't match the original - "
1237 "likely false positive due to mmap IO (bug 11742)";
1239 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1240 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1241 msg, libcfs_nid2str(peer->nid),
1242 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1243 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1244 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1245 POSTID(&oa->o_oi), pga[0]->off,
1246 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1247 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1248 "client csum now %x\n", client_cksum, client_cksum_type,
1249 server_cksum, cksum_type, new_cksum);
1253 /* Note rc enters this function as number of bytes transferred */
1254 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1256 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1257 const lnet_process_id_t *peer =
1258 &req->rq_import->imp_connection->c_peer;
1259 struct client_obd *cli = aa->aa_cli;
1260 struct ost_body *body;
1261 u32 client_cksum = 0;
1264 if (rc < 0 && rc != -EDQUOT) {
1265 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1269 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1270 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1272 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1276 /* set/clear over quota flag for a uid/gid */
1277 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1278 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1279 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1281 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1282 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1284 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1287 osc_update_grant(cli, body);
1292 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1293 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1295 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1297 CERROR("Unexpected +ve rc %d\n", rc);
1300 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1302 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1305 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1306 check_write_checksum(&body->oa, peer, client_cksum,
1307 body->oa.o_cksum, aa->aa_requested_nob,
1308 aa->aa_page_count, aa->aa_ppga,
1309 cksum_type_unpack(aa->aa_oa->o_flags)))
1312 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1313 aa->aa_page_count, aa->aa_ppga);
1317 /* The rest of this function executes only for OST_READs */
1319 /* if unwrap_bulk failed, return -EAGAIN to retry */
1320 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1322 GOTO(out, rc = -EAGAIN);
1324 if (rc > aa->aa_requested_nob) {
1325 CERROR("Unexpected rc %d (%d requested)\n", rc,
1326 aa->aa_requested_nob);
1330 if (rc != req->rq_bulk->bd_nob_transferred) {
1331 CERROR ("Unexpected rc %d (%d transferred)\n",
1332 rc, req->rq_bulk->bd_nob_transferred);
1336 if (rc < aa->aa_requested_nob)
1337 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1339 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1340 static int cksum_counter;
1341 u32 server_cksum = body->oa.o_cksum;
1344 cksum_type_t cksum_type;
1346 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1347 body->oa.o_flags : 0);
1348 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1349 aa->aa_ppga, OST_READ,
1352 if (peer->nid != req->rq_bulk->bd_sender) {
1354 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1357 if (server_cksum != client_cksum) {
1358 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1359 "%s%s%s inode "DFID" object "DOSTID
1360 " extent ["LPU64"-"LPU64"]\n",
1361 req->rq_import->imp_obd->obd_name,
1362 libcfs_nid2str(peer->nid),
1364 body->oa.o_valid & OBD_MD_FLFID ?
1365 body->oa.o_parent_seq : (__u64)0,
1366 body->oa.o_valid & OBD_MD_FLFID ?
1367 body->oa.o_parent_oid : 0,
1368 body->oa.o_valid & OBD_MD_FLFID ?
1369 body->oa.o_parent_ver : 0,
1370 POSTID(&body->oa.o_oi),
1371 aa->aa_ppga[0]->off,
1372 aa->aa_ppga[aa->aa_page_count-1]->off +
1373 aa->aa_ppga[aa->aa_page_count-1]->count -
1375 CERROR("client %x, server %x, cksum_type %x\n",
1376 client_cksum, server_cksum, cksum_type);
1378 aa->aa_oa->o_cksum = client_cksum;
1382 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1385 } else if (unlikely(client_cksum)) {
1386 static int cksum_missed;
1389 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1390 CERROR("Checksum %u requested from %s but not sent\n",
1391 cksum_missed, libcfs_nid2str(peer->nid));
1397 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1398 aa->aa_oa, &body->oa);
1403 static int osc_brw_redo_request(struct ptlrpc_request *request,
1404 struct osc_brw_async_args *aa, int rc)
1406 struct ptlrpc_request *new_req;
1407 struct osc_brw_async_args *new_aa;
1408 struct osc_async_page *oap;
1411 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1412 "redo for recoverable error %d", rc);
1414 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1415 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1416 aa->aa_cli, aa->aa_oa,
1417 aa->aa_page_count, aa->aa_ppga,
1418 &new_req, aa->aa_ocapa, 0, 1);
1422 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1423 if (oap->oap_request != NULL) {
1424 LASSERTF(request == oap->oap_request,
1425 "request %p != oap_request %p\n",
1426 request, oap->oap_request);
1427 if (oap->oap_interrupted) {
1428 ptlrpc_req_finished(new_req);
1433 /* New request takes over pga and oaps from old request.
1434 * Note that copying a list_head doesn't work, need to move it... */
1436 new_req->rq_interpret_reply = request->rq_interpret_reply;
1437 new_req->rq_async_args = request->rq_async_args;
1438 new_req->rq_commit_cb = request->rq_commit_cb;
1439 /* cap resend delay to the current request timeout, this is similar to
1440 * what ptlrpc does (see after_reply()) */
1441 if (aa->aa_resends > new_req->rq_timeout)
1442 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1444 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1445 new_req->rq_generation_set = 1;
1446 new_req->rq_import_generation = request->rq_import_generation;
1448 new_aa = ptlrpc_req_async_args(new_req);
1450 INIT_LIST_HEAD(&new_aa->aa_oaps);
1451 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1452 INIT_LIST_HEAD(&new_aa->aa_exts);
1453 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1454 new_aa->aa_resends = aa->aa_resends;
1456 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1457 if (oap->oap_request) {
1458 ptlrpc_req_finished(oap->oap_request);
1459 oap->oap_request = ptlrpc_request_addref(new_req);
1463 new_aa->aa_ocapa = aa->aa_ocapa;
1464 aa->aa_ocapa = NULL;
1466 /* XXX: This code will run into problem if we're going to support
1467 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1468 * and wait for all of them to be finished. We should inherit request
1469 * set from old request. */
1470 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1472 DEBUG_REQ(D_INFO, new_req, "new request");
1477 * ugh, we want disk allocation on the target to happen in offset order. we'll
1478 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1479 * fine for our small page arrays and doesn't require allocation. its an
1480 * insertion sort that swaps elements that are strides apart, shrinking the
1481 * stride down until its '1' and the array is sorted.
1483 static void sort_brw_pages(struct brw_page **array, int num)
1486 struct brw_page *tmp;
1490 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1495 for (i = stride ; i < num ; i++) {
1498 while (j >= stride && array[j - stride]->off > tmp->off) {
1499 array[j] = array[j - stride];
1504 } while (stride > 1);
1507 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1509 LASSERT(ppga != NULL);
1510 OBD_FREE(ppga, sizeof(*ppga) * count);
1513 static int brw_interpret(const struct lu_env *env,
1514 struct ptlrpc_request *req, void *data, int rc)
1516 struct osc_brw_async_args *aa = data;
1517 struct osc_extent *ext;
1518 struct osc_extent *tmp;
1519 struct client_obd *cli = aa->aa_cli;
1522 rc = osc_brw_fini_request(req, rc);
1523 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1524 /* When server return -EINPROGRESS, client should always retry
1525 * regardless of the number of times the bulk was resent already. */
1526 if (osc_recoverable_error(rc)) {
1527 if (req->rq_import_generation !=
1528 req->rq_import->imp_generation) {
1529 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1530 ""DOSTID", rc = %d.\n",
1531 req->rq_import->imp_obd->obd_name,
1532 POSTID(&aa->aa_oa->o_oi), rc);
1533 } else if (rc == -EINPROGRESS ||
1534 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1535 rc = osc_brw_redo_request(req, aa, rc);
1537 CERROR("%s: too many resent retries for object: "
1538 ""LPU64":"LPU64", rc = %d.\n",
1539 req->rq_import->imp_obd->obd_name,
1540 POSTID(&aa->aa_oa->o_oi), rc);
1545 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1550 capa_put(aa->aa_ocapa);
1551 aa->aa_ocapa = NULL;
1555 struct obdo *oa = aa->aa_oa;
1556 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1557 unsigned long valid = 0;
1558 struct cl_object *obj;
1559 struct osc_async_page *last;
1561 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1562 obj = osc2cl(last->oap_obj);
1564 cl_object_attr_lock(obj);
1565 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1566 attr->cat_blocks = oa->o_blocks;
1567 valid |= CAT_BLOCKS;
1569 if (oa->o_valid & OBD_MD_FLMTIME) {
1570 attr->cat_mtime = oa->o_mtime;
1573 if (oa->o_valid & OBD_MD_FLATIME) {
1574 attr->cat_atime = oa->o_atime;
1577 if (oa->o_valid & OBD_MD_FLCTIME) {
1578 attr->cat_ctime = oa->o_ctime;
1582 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1583 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1584 loff_t last_off = last->oap_count + last->oap_obj_off +
1587 /* Change file size if this is an out of quota or
1588 * direct IO write and it extends the file size */
1589 if (loi->loi_lvb.lvb_size < last_off) {
1590 attr->cat_size = last_off;
1593 /* Extend KMS if it's not a lockless write */
1594 if (loi->loi_kms < last_off &&
1595 oap2osc_page(last)->ops_srvlock == 0) {
1596 attr->cat_kms = last_off;
1602 cl_object_attr_update(env, obj, attr, valid);
1603 cl_object_attr_unlock(obj);
1605 OBDO_FREE(aa->aa_oa);
1607 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1608 osc_inc_unstable_pages(req);
1610 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1611 list_del_init(&ext->oe_link);
1612 osc_extent_finish(env, ext, 1, rc);
1614 LASSERT(list_empty(&aa->aa_exts));
1615 LASSERT(list_empty(&aa->aa_oaps));
1617 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1618 req->rq_bulk->bd_nob_transferred);
1619 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1620 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1622 spin_lock(&cli->cl_loi_list_lock);
1623 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1624 * is called so we know whether to go to sync BRWs or wait for more
1625 * RPCs to complete */
1626 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1627 cli->cl_w_in_flight--;
1629 cli->cl_r_in_flight--;
1630 osc_wake_cache_waiters(cli);
1631 spin_unlock(&cli->cl_loi_list_lock);
1633 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1637 static void brw_commit(struct ptlrpc_request *req)
1639 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1640 * this called via the rq_commit_cb, I need to ensure
1641 * osc_dec_unstable_pages is still called. Otherwise unstable
1642 * pages may be leaked. */
1643 spin_lock(&req->rq_lock);
1644 if (likely(req->rq_unstable)) {
1645 req->rq_unstable = 0;
1646 spin_unlock(&req->rq_lock);
1648 osc_dec_unstable_pages(req);
1650 req->rq_committed = 1;
1651 spin_unlock(&req->rq_lock);
1656 * Build an RPC by the list of extent @ext_list. The caller must ensure
1657 * that the total pages in this list are NOT over max pages per RPC.
1658 * Extents in the list must be in OES_RPC state.
1660 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1661 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1663 struct ptlrpc_request *req = NULL;
1664 struct osc_extent *ext;
1665 struct brw_page **pga = NULL;
1666 struct osc_brw_async_args *aa = NULL;
1667 struct obdo *oa = NULL;
1668 struct osc_async_page *oap;
1669 struct osc_async_page *tmp;
1670 struct cl_req *clerq = NULL;
1671 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1673 struct cl_req_attr *crattr = NULL;
1674 loff_t starting_offset = OBD_OBJECT_EOF;
1675 loff_t ending_offset = 0;
1679 bool soft_sync = false;
1682 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
1683 struct ost_body *body;
1685 LASSERT(!list_empty(ext_list));
1687 /* add pages into rpc_list to build BRW rpc */
1688 list_for_each_entry(ext, ext_list, oe_link) {
1689 LASSERT(ext->oe_state == OES_RPC);
1690 mem_tight |= ext->oe_memalloc;
1691 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1693 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1694 if (starting_offset == OBD_OBJECT_EOF ||
1695 starting_offset > oap->oap_obj_off)
1696 starting_offset = oap->oap_obj_off;
1698 LASSERT(oap->oap_page_off == 0);
1699 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1700 ending_offset = oap->oap_obj_off +
1703 LASSERT(oap->oap_page_off + oap->oap_count ==
1708 soft_sync = osc_over_unstable_soft_limit(cli);
1710 mpflag = cfs_memory_pressure_get_and_set();
1712 OBD_ALLOC(crattr, sizeof(*crattr));
1714 GOTO(out, rc = -ENOMEM);
1716 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1718 GOTO(out, rc = -ENOMEM);
1722 GOTO(out, rc = -ENOMEM);
1725 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1726 struct cl_page *page = oap2cl_page(oap);
1727 if (clerq == NULL) {
1728 clerq = cl_req_alloc(env, page, crt,
1729 1 /* only 1-object rpcs for now */);
1731 GOTO(out, rc = PTR_ERR(clerq));
1734 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1736 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1737 pga[i] = &oap->oap_brw_page;
1738 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1739 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1740 pga[i]->pg, page_index(oap->oap_page), oap,
1743 cl_req_page_add(env, clerq, page);
1746 /* always get the data for the obdo for the rpc */
1747 LASSERT(clerq != NULL);
1748 crattr->cra_oa = oa;
1749 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1751 rc = cl_req_prep(env, clerq);
1753 CERROR("cl_req_prep failed: %d\n", rc);
1757 sort_brw_pages(pga, page_count);
1758 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req,
1759 crattr->cra_capa, 1, 0);
1761 CERROR("prep_req failed: %d\n", rc);
1765 req->rq_commit_cb = brw_commit;
1766 req->rq_interpret_reply = brw_interpret;
1769 req->rq_memalloc = 1;
1771 /* Need to update the timestamps after the request is built in case
1772 * we race with setattr (locally or in queue at OST). If OST gets
1773 * later setattr before earlier BRW (as determined by the request xid),
1774 * the OST will not use BRW timestamps. Sadly, there is no obvious
1775 * way to do this in a single call. bug 10150 */
1776 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1777 crattr->cra_oa = &body->oa;
1778 cl_req_attr_set(env, clerq, crattr,
1779 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1781 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1783 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1784 aa = ptlrpc_req_async_args(req);
1785 INIT_LIST_HEAD(&aa->aa_oaps);
1786 list_splice_init(&rpc_list, &aa->aa_oaps);
1787 INIT_LIST_HEAD(&aa->aa_exts);
1788 list_splice_init(ext_list, &aa->aa_exts);
1789 aa->aa_clerq = clerq;
1791 /* queued sync pages can be torn down while the pages
1792 * were between the pending list and the rpc */
1794 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1795 /* only one oap gets a request reference */
1798 if (oap->oap_interrupted && !req->rq_intr) {
1799 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1801 ptlrpc_mark_interrupted(req);
1805 tmp->oap_request = ptlrpc_request_addref(req);
1807 spin_lock(&cli->cl_loi_list_lock);
1808 starting_offset >>= PAGE_CACHE_SHIFT;
1809 if (cmd == OBD_BRW_READ) {
1810 cli->cl_r_in_flight++;
1811 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1812 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1813 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1814 starting_offset + 1);
1816 cli->cl_w_in_flight++;
1817 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1818 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1819 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1820 starting_offset + 1);
1822 spin_unlock(&cli->cl_loi_list_lock);
1824 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1825 page_count, aa, cli->cl_r_in_flight,
1826 cli->cl_w_in_flight);
1828 /* XXX: Maybe the caller can check the RPC bulk descriptor to
1829 * see which CPU/NUMA node the majority of pages were allocated
1830 * on, and try to assign the async RPC to the CPU core
1831 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1833 * But on the other hand, we expect that multiple ptlrpcd
1834 * threads and the initial write sponsor can run in parallel,
1835 * especially when data checksum is enabled, which is CPU-bound
1836 * operation and single ptlrpcd thread cannot process in time.
1837 * So more ptlrpcd threads sharing BRW load
1838 * (with PDL_POLICY_ROUND) seems better.
1840 ptlrpcd_add_req(req, pol, -1);
1846 cfs_memory_pressure_restore(mpflag);
1848 if (crattr != NULL) {
1849 capa_put(crattr->cra_capa);
1850 OBD_FREE(crattr, sizeof(*crattr));
1854 LASSERT(req == NULL);
1859 OBD_FREE(pga, sizeof(*pga) * page_count);
1860 /* this should happen rarely and is pretty bad, it makes the
1861 * pending list not follow the dirty order */
1862 while (!list_empty(ext_list)) {
1863 ext = list_entry(ext_list->next, struct osc_extent,
1865 list_del_init(&ext->oe_link);
1866 osc_extent_finish(env, ext, 0, rc);
1868 if (clerq && !IS_ERR(clerq))
1869 cl_req_completion(env, clerq, rc);
1874 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1875 struct ldlm_enqueue_info *einfo)
1877 void *data = einfo->ei_cbdata;
1880 LASSERT(lock != NULL);
1881 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1882 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1883 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1884 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1886 lock_res_and_lock(lock);
1888 if (lock->l_ast_data == NULL)
1889 lock->l_ast_data = data;
1890 if (lock->l_ast_data == data)
1893 unlock_res_and_lock(lock);
1898 static int osc_set_data_with_check(struct lustre_handle *lockh,
1899 struct ldlm_enqueue_info *einfo)
1901 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1905 set = osc_set_lock_data_with_check(lock, einfo);
1906 LDLM_LOCK_PUT(lock);
1908 CERROR("lockh %p, data %p - client evicted?\n",
1909 lockh, einfo->ei_cbdata);
1913 static int osc_enqueue_fini(struct ptlrpc_request *req,
1914 osc_enqueue_upcall_f upcall, void *cookie,
1915 struct lustre_handle *lockh, ldlm_mode_t mode,
1916 __u64 *flags, int agl, int errcode)
1918 bool intent = *flags & LDLM_FL_HAS_INTENT;
1922 /* The request was created before ldlm_cli_enqueue call. */
1923 if (intent && errcode == ELDLM_LOCK_ABORTED) {
1924 struct ldlm_reply *rep;
1926 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1927 LASSERT(rep != NULL);
1929 rep->lock_policy_res1 =
1930 ptlrpc_status_ntoh(rep->lock_policy_res1);
1931 if (rep->lock_policy_res1)
1932 errcode = rep->lock_policy_res1;
1934 *flags |= LDLM_FL_LVB_READY;
1935 } else if (errcode == ELDLM_OK) {
1936 *flags |= LDLM_FL_LVB_READY;
1939 /* Call the update callback. */
1940 rc = (*upcall)(cookie, lockh, errcode);
1942 /* release the reference taken in ldlm_cli_enqueue() */
1943 if (errcode == ELDLM_LOCK_MATCHED)
1945 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1946 ldlm_lock_decref(lockh, mode);
1951 static int osc_enqueue_interpret(const struct lu_env *env,
1952 struct ptlrpc_request *req,
1953 struct osc_enqueue_args *aa, int rc)
1955 struct ldlm_lock *lock;
1956 struct lustre_handle *lockh = &aa->oa_lockh;
1957 ldlm_mode_t mode = aa->oa_mode;
1958 struct ost_lvb *lvb = aa->oa_lvb;
1959 __u32 lvb_len = sizeof(*lvb);
1964 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1966 lock = ldlm_handle2lock(lockh);
1967 LASSERTF(lock != NULL,
1968 "lockh "LPX64", req %p, aa %p - client evicted?\n",
1969 lockh->cookie, req, aa);
1971 /* Take an additional reference so that a blocking AST that
1972 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
1973 * to arrive after an upcall has been executed by
1974 * osc_enqueue_fini(). */
1975 ldlm_lock_addref(lockh, mode);
1977 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
1978 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
1980 /* Let CP AST to grant the lock first. */
1981 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
1984 LASSERT(aa->oa_lvb == NULL);
1985 LASSERT(aa->oa_flags == NULL);
1986 aa->oa_flags = &flags;
1989 /* Complete obtaining the lock procedure. */
1990 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
1991 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
1993 /* Complete osc stuff. */
1994 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
1995 aa->oa_flags, aa->oa_agl, rc);
1997 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
1999 ldlm_lock_decref(lockh, mode);
2000 LDLM_LOCK_PUT(lock);
2004 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2006 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2007 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2008 * other synchronous requests, however keeping some locks and trying to obtain
2009 * others may take a considerable amount of time in a case of ost failure; and
2010 * when other sync requests do not get released lock from a client, the client
2011 * is evicted from the cluster -- such scenarious make the life difficult, so
2012 * release locks just after they are obtained. */
2013 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2014 __u64 *flags, ldlm_policy_data_t *policy,
2015 struct ost_lvb *lvb, int kms_valid,
2016 osc_enqueue_upcall_f upcall, void *cookie,
2017 struct ldlm_enqueue_info *einfo,
2018 struct ptlrpc_request_set *rqset, int async, int agl)
2020 struct obd_device *obd = exp->exp_obd;
2021 struct lustre_handle lockh = { 0 };
2022 struct ptlrpc_request *req = NULL;
2023 int intent = *flags & LDLM_FL_HAS_INTENT;
2024 __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2029 /* Filesystem lock extents are extended to page boundaries so that
2030 * dealing with the page cache is a little smoother. */
2031 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2032 policy->l_extent.end |= ~PAGE_MASK;
2035 * kms is not valid when either object is completely fresh (so that no
2036 * locks are cached), or object was evicted. In the latter case cached
2037 * lock cannot be used, because it would prime inode state with
2038 * potentially stale LVB.
2043 /* Next, search for already existing extent locks that will cover us */
2044 /* If we're trying to read, we also search for an existing PW lock. The
2045 * VFS and page cache already protect us locally, so lots of readers/
2046 * writers can share a single PW lock.
2048 * There are problems with conversion deadlocks, so instead of
2049 * converting a read lock to a write lock, we'll just enqueue a new
2052 * At some point we should cancel the read lock instead of making them
2053 * send us a blocking callback, but there are problems with canceling
2054 * locks out from other users right now, too. */
2055 mode = einfo->ei_mode;
2056 if (einfo->ei_mode == LCK_PR)
2058 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2059 einfo->ei_type, policy, mode, &lockh, 0);
2061 struct ldlm_lock *matched;
2063 if (*flags & LDLM_FL_TEST_LOCK)
2066 matched = ldlm_handle2lock(&lockh);
2068 /* AGL enqueues DLM locks speculatively. Therefore if
2069 * it already exists a DLM lock, it wll just inform the
2070 * caller to cancel the AGL process for this stripe. */
2071 ldlm_lock_decref(&lockh, mode);
2072 LDLM_LOCK_PUT(matched);
2074 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2075 *flags |= LDLM_FL_LVB_READY;
2077 /* We already have a lock, and it's referenced. */
2078 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2080 ldlm_lock_decref(&lockh, mode);
2081 LDLM_LOCK_PUT(matched);
2084 ldlm_lock_decref(&lockh, mode);
2085 LDLM_LOCK_PUT(matched);
2090 if (*flags & LDLM_FL_TEST_LOCK)
2094 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2095 &RQF_LDLM_ENQUEUE_LVB);
2099 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2101 ptlrpc_request_free(req);
2105 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2107 ptlrpc_request_set_replen(req);
2110 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2111 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2113 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2114 sizeof(*lvb), LVB_T_OST, &lockh, async);
2117 struct osc_enqueue_args *aa;
2118 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2119 aa = ptlrpc_req_async_args(req);
2121 aa->oa_mode = einfo->ei_mode;
2122 aa->oa_type = einfo->ei_type;
2123 lustre_handle_copy(&aa->oa_lockh, &lockh);
2124 aa->oa_upcall = upcall;
2125 aa->oa_cookie = cookie;
2128 aa->oa_flags = flags;
2131 /* AGL is essentially to enqueue an DLM lock
2132 * in advance, so we don't care about the
2133 * result of AGL enqueue. */
2135 aa->oa_flags = NULL;
2138 req->rq_interpret_reply =
2139 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2140 if (rqset == PTLRPCD_SET)
2141 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2143 ptlrpc_set_add_req(rqset, req);
2144 } else if (intent) {
2145 ptlrpc_req_finished(req);
2150 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2153 ptlrpc_req_finished(req);
2158 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2159 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2160 __u64 *flags, void *data, struct lustre_handle *lockh,
2163 struct obd_device *obd = exp->exp_obd;
2164 __u64 lflags = *flags;
2168 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2171 /* Filesystem lock extents are extended to page boundaries so that
2172 * dealing with the page cache is a little smoother */
2173 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2174 policy->l_extent.end |= ~PAGE_MASK;
2176 /* Next, search for already existing extent locks that will cover us */
2177 /* If we're trying to read, we also search for an existing PW lock. The
2178 * VFS and page cache already protect us locally, so lots of readers/
2179 * writers can share a single PW lock. */
2183 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2184 res_id, type, policy, rc, lockh, unref);
2187 if (!osc_set_data_with_check(lockh, data)) {
2188 if (!(lflags & LDLM_FL_TEST_LOCK))
2189 ldlm_lock_decref(lockh, rc);
2193 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2194 ldlm_lock_addref(lockh, LCK_PR);
2195 ldlm_lock_decref(lockh, LCK_PW);
2202 static int osc_statfs_interpret(const struct lu_env *env,
2203 struct ptlrpc_request *req,
2204 struct osc_async_args *aa, int rc)
2206 struct obd_statfs *msfs;
2210 /* The request has in fact never been sent
2211 * due to issues at a higher level (LOV).
2212 * Exit immediately since the caller is
2213 * aware of the problem and takes care
2214 * of the clean up */
2217 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2218 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2224 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2226 GOTO(out, rc = -EPROTO);
2229 *aa->aa_oi->oi_osfs = *msfs;
2231 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2235 static int osc_statfs_async(struct obd_export *exp,
2236 struct obd_info *oinfo, __u64 max_age,
2237 struct ptlrpc_request_set *rqset)
2239 struct obd_device *obd = class_exp2obd(exp);
2240 struct ptlrpc_request *req;
2241 struct osc_async_args *aa;
2245 /* We could possibly pass max_age in the request (as an absolute
2246 * timestamp or a "seconds.usec ago") so the target can avoid doing
2247 * extra calls into the filesystem if that isn't necessary (e.g.
2248 * during mount that would help a bit). Having relative timestamps
2249 * is not so great if request processing is slow, while absolute
2250 * timestamps are not ideal because they need time synchronization. */
2251 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2255 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2257 ptlrpc_request_free(req);
2260 ptlrpc_request_set_replen(req);
2261 req->rq_request_portal = OST_CREATE_PORTAL;
2262 ptlrpc_at_set_req_timeout(req);
2264 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2265 /* procfs requests not want stat in wait for avoid deadlock */
2266 req->rq_no_resend = 1;
2267 req->rq_no_delay = 1;
2270 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2271 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2272 aa = ptlrpc_req_async_args(req);
2275 ptlrpc_set_add_req(rqset, req);
2279 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2280 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2282 struct obd_device *obd = class_exp2obd(exp);
2283 struct obd_statfs *msfs;
2284 struct ptlrpc_request *req;
2285 struct obd_import *imp = NULL;
2289 /*Since the request might also come from lprocfs, so we need
2290 *sync this with client_disconnect_export Bug15684*/
2291 down_read(&obd->u.cli.cl_sem);
2292 if (obd->u.cli.cl_import)
2293 imp = class_import_get(obd->u.cli.cl_import);
2294 up_read(&obd->u.cli.cl_sem);
2298 /* We could possibly pass max_age in the request (as an absolute
2299 * timestamp or a "seconds.usec ago") so the target can avoid doing
2300 * extra calls into the filesystem if that isn't necessary (e.g.
2301 * during mount that would help a bit). Having relative timestamps
2302 * is not so great if request processing is slow, while absolute
2303 * timestamps are not ideal because they need time synchronization. */
2304 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2306 class_import_put(imp);
2311 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2313 ptlrpc_request_free(req);
2316 ptlrpc_request_set_replen(req);
2317 req->rq_request_portal = OST_CREATE_PORTAL;
2318 ptlrpc_at_set_req_timeout(req);
2320 if (flags & OBD_STATFS_NODELAY) {
2321 /* procfs requests not want stat in wait for avoid deadlock */
2322 req->rq_no_resend = 1;
2323 req->rq_no_delay = 1;
2326 rc = ptlrpc_queue_wait(req);
2330 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2332 GOTO(out, rc = -EPROTO);
2339 ptlrpc_req_finished(req);
2343 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2344 void *karg, void *uarg)
2346 struct obd_device *obd = exp->exp_obd;
2347 struct obd_ioctl_data *data = karg;
2351 if (!try_module_get(THIS_MODULE)) {
2352 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2353 module_name(THIS_MODULE));
2357 case OBD_IOC_CLIENT_RECOVER:
2358 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2359 data->ioc_inlbuf1, 0);
2363 case IOC_OSC_SET_ACTIVE:
2364 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2367 case OBD_IOC_PING_TARGET:
2368 err = ptlrpc_obd_ping(obd);
2371 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2372 cmd, current_comm());
2373 GOTO(out, err = -ENOTTY);
2376 module_put(THIS_MODULE);
2380 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2381 u32 keylen, void *key,
2382 u32 vallen, void *val,
2383 struct ptlrpc_request_set *set)
2385 struct ptlrpc_request *req;
2386 struct obd_device *obd = exp->exp_obd;
2387 struct obd_import *imp = class_exp2cliimp(exp);
2392 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2394 if (KEY_IS(KEY_CHECKSUM)) {
2395 if (vallen != sizeof(int))
2397 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2401 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2402 sptlrpc_conf_client_adapt(obd);
2406 if (KEY_IS(KEY_FLUSH_CTX)) {
2407 sptlrpc_import_flush_my_ctx(imp);
2411 if (KEY_IS(KEY_CACHE_SET)) {
2412 struct client_obd *cli = &obd->u.cli;
2414 LASSERT(cli->cl_cache == NULL); /* only once */
2415 cli->cl_cache = (struct cl_client_cache *)val;
2416 cl_cache_incref(cli->cl_cache);
2417 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2419 /* add this osc into entity list */
2420 LASSERT(list_empty(&cli->cl_lru_osc));
2421 spin_lock(&cli->cl_cache->ccc_lru_lock);
2422 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2423 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2428 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2429 struct client_obd *cli = &obd->u.cli;
2430 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2431 long target = *(long *)val;
2433 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2438 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2441 /* We pass all other commands directly to OST. Since nobody calls osc
2442 methods directly and everybody is supposed to go through LOV, we
2443 assume lov checked invalid values for us.
2444 The only recognised values so far are evict_by_nid and mds_conn.
2445 Even if something bad goes through, we'd get a -EINVAL from OST
2448 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2449 &RQF_OST_SET_GRANT_INFO :
2454 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2455 RCL_CLIENT, keylen);
2456 if (!KEY_IS(KEY_GRANT_SHRINK))
2457 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2458 RCL_CLIENT, vallen);
2459 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2461 ptlrpc_request_free(req);
2465 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2466 memcpy(tmp, key, keylen);
2467 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2470 memcpy(tmp, val, vallen);
2472 if (KEY_IS(KEY_GRANT_SHRINK)) {
2473 struct osc_grant_args *aa;
2476 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2477 aa = ptlrpc_req_async_args(req);
2480 ptlrpc_req_finished(req);
2483 *oa = ((struct ost_body *)val)->oa;
2485 req->rq_interpret_reply = osc_shrink_grant_interpret;
2488 ptlrpc_request_set_replen(req);
2489 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2490 LASSERT(set != NULL);
2491 ptlrpc_set_add_req(set, req);
2492 ptlrpc_check_set(NULL, set);
2494 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2499 static int osc_reconnect(const struct lu_env *env,
2500 struct obd_export *exp, struct obd_device *obd,
2501 struct obd_uuid *cluuid,
2502 struct obd_connect_data *data,
2505 struct client_obd *cli = &obd->u.cli;
2507 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2510 spin_lock(&cli->cl_loi_list_lock);
2511 data->ocd_grant = (cli->cl_avail_grant +
2512 (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2513 2 * cli_brw_size(obd);
2514 lost_grant = cli->cl_lost_grant;
2515 cli->cl_lost_grant = 0;
2516 spin_unlock(&cli->cl_loi_list_lock);
2518 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2519 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2520 data->ocd_version, data->ocd_grant, lost_grant);
2526 static int osc_disconnect(struct obd_export *exp)
2528 struct obd_device *obd = class_exp2obd(exp);
2531 rc = client_disconnect_export(exp);
2533 * Initially we put del_shrink_grant before disconnect_export, but it
2534 * causes the following problem if setup (connect) and cleanup
2535 * (disconnect) are tangled together.
2536 * connect p1 disconnect p2
2537 * ptlrpc_connect_import
2538 * ............... class_manual_cleanup
2541 * ptlrpc_connect_interrupt
2543 * add this client to shrink list
2545 * Bang! pinger trigger the shrink.
2546 * So the osc should be disconnected from the shrink list, after we
2547 * are sure the import has been destroyed. BUG18662
2549 if (obd->u.cli.cl_import == NULL)
2550 osc_del_shrink_grant(&obd->u.cli);
2554 static int osc_import_event(struct obd_device *obd,
2555 struct obd_import *imp,
2556 enum obd_import_event event)
2558 struct client_obd *cli;
2562 LASSERT(imp->imp_obd == obd);
2565 case IMP_EVENT_DISCON: {
2567 spin_lock(&cli->cl_loi_list_lock);
2568 cli->cl_avail_grant = 0;
2569 cli->cl_lost_grant = 0;
2570 spin_unlock(&cli->cl_loi_list_lock);
2573 case IMP_EVENT_INACTIVE: {
2574 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2577 case IMP_EVENT_INVALIDATE: {
2578 struct ldlm_namespace *ns = obd->obd_namespace;
2582 env = cl_env_get(&refcheck);
2586 /* all pages go to failing rpcs due to the invalid
2588 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2590 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2591 cl_env_put(env, &refcheck);
2596 case IMP_EVENT_ACTIVE: {
2597 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2600 case IMP_EVENT_OCD: {
2601 struct obd_connect_data *ocd = &imp->imp_connect_data;
2603 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2604 osc_init_grant(&obd->u.cli, ocd);
2607 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2608 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2610 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2613 case IMP_EVENT_DEACTIVATE: {
2614 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2617 case IMP_EVENT_ACTIVATE: {
2618 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2622 CERROR("Unknown import event %d\n", event);
2629 * Determine whether the lock can be canceled before replaying the lock
2630 * during recovery, see bug16774 for detailed information.
2632 * \retval zero the lock can't be canceled
2633 * \retval other ok to cancel
2635 static int osc_cancel_weight(struct ldlm_lock *lock)
2638 * Cancel all unused and granted extent lock.
2640 if (lock->l_resource->lr_type == LDLM_EXTENT &&
2641 lock->l_granted_mode == lock->l_req_mode &&
2642 osc_ldlm_weigh_ast(lock) == 0)
2648 static int brw_queue_work(const struct lu_env *env, void *data)
2650 struct client_obd *cli = data;
2652 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2654 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2658 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2660 struct client_obd *cli = &obd->u.cli;
2661 struct obd_type *type;
2666 rc = ptlrpcd_addref();
2670 rc = client_obd_setup(obd, lcfg);
2672 GOTO(out_ptlrpcd, rc);
2674 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2675 if (IS_ERR(handler))
2676 GOTO(out_client_setup, rc = PTR_ERR(handler));
2677 cli->cl_writeback_work = handler;
2679 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2680 if (IS_ERR(handler))
2681 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2682 cli->cl_lru_work = handler;
2684 rc = osc_quota_setup(obd);
2686 GOTO(out_ptlrpcd_work, rc);
2688 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2690 #ifdef CONFIG_PROC_FS
2691 obd->obd_vars = lprocfs_osc_obd_vars;
2693 /* If this is true then both client (osc) and server (osp) are on the
2694 * same node. The osp layer if loaded first will register the osc proc
2695 * directory. In that case this obd_device will be attached its proc
2696 * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2697 type = class_search_type(LUSTRE_OSP_NAME);
2698 if (type && type->typ_procsym) {
2699 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2701 obd->obd_vars, obd);
2702 if (IS_ERR(obd->obd_proc_entry)) {
2703 rc = PTR_ERR(obd->obd_proc_entry);
2704 CERROR("error %d setting up lprocfs for %s\n", rc,
2706 obd->obd_proc_entry = NULL;
2709 rc = lprocfs_obd_setup(obd);
2712 /* If the basic OSC proc tree construction succeeded then
2713 * lets do the rest. */
2715 lproc_osc_attach_seqstat(obd);
2716 sptlrpc_lprocfs_cliobd_attach(obd);
2717 ptlrpc_lprocfs_register_obd(obd);
2720 /* We need to allocate a few requests more, because
2721 * brw_interpret tries to create new requests before freeing
2722 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2723 * reserved, but I'm afraid that might be too much wasted RAM
2724 * in fact, so 2 is just my guess and still should work. */
2725 cli->cl_import->imp_rq_pool =
2726 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2728 ptlrpc_add_rqs_to_pool);
2730 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2731 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2735 if (cli->cl_writeback_work != NULL) {
2736 ptlrpcd_destroy_work(cli->cl_writeback_work);
2737 cli->cl_writeback_work = NULL;
2739 if (cli->cl_lru_work != NULL) {
2740 ptlrpcd_destroy_work(cli->cl_lru_work);
2741 cli->cl_lru_work = NULL;
2744 client_obd_cleanup(obd);
2750 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2756 case OBD_CLEANUP_EARLY: {
2757 struct obd_import *imp;
2758 imp = obd->u.cli.cl_import;
2759 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2760 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2761 ptlrpc_deactivate_import(imp);
2762 spin_lock(&imp->imp_lock);
2763 imp->imp_pingable = 0;
2764 spin_unlock(&imp->imp_lock);
2767 case OBD_CLEANUP_EXPORTS: {
2768 struct client_obd *cli = &obd->u.cli;
2770 * for echo client, export may be on zombie list, wait for
2771 * zombie thread to cull it, because cli.cl_import will be
2772 * cleared in client_disconnect_export():
2773 * class_export_destroy() -> obd_cleanup() ->
2774 * echo_device_free() -> echo_client_cleanup() ->
2775 * obd_disconnect() -> osc_disconnect() ->
2776 * client_disconnect_export()
2778 obd_zombie_barrier();
2779 if (cli->cl_writeback_work) {
2780 ptlrpcd_destroy_work(cli->cl_writeback_work);
2781 cli->cl_writeback_work = NULL;
2783 if (cli->cl_lru_work) {
2784 ptlrpcd_destroy_work(cli->cl_lru_work);
2785 cli->cl_lru_work = NULL;
2787 obd_cleanup_client_import(obd);
2788 ptlrpc_lprocfs_unregister_obd(obd);
2789 lprocfs_obd_cleanup(obd);
2796 int osc_cleanup(struct obd_device *obd)
2798 struct client_obd *cli = &obd->u.cli;
2804 if (cli->cl_cache != NULL) {
2805 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2806 spin_lock(&cli->cl_cache->ccc_lru_lock);
2807 list_del_init(&cli->cl_lru_osc);
2808 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2809 cli->cl_lru_left = NULL;
2810 cl_cache_decref(cli->cl_cache);
2811 cli->cl_cache = NULL;
2814 /* free memory of osc quota cache */
2815 osc_quota_cleanup(obd);
2817 rc = client_obd_cleanup(obd);
2823 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2825 int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2826 return rc > 0 ? 0: rc;
2829 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2831 return osc_process_config_base(obd, buf);
2834 static struct obd_ops osc_obd_ops = {
2835 .o_owner = THIS_MODULE,
2836 .o_setup = osc_setup,
2837 .o_precleanup = osc_precleanup,
2838 .o_cleanup = osc_cleanup,
2839 .o_add_conn = client_import_add_conn,
2840 .o_del_conn = client_import_del_conn,
2841 .o_connect = client_connect_import,
2842 .o_reconnect = osc_reconnect,
2843 .o_disconnect = osc_disconnect,
2844 .o_statfs = osc_statfs,
2845 .o_statfs_async = osc_statfs_async,
2846 .o_create = osc_create,
2847 .o_destroy = osc_destroy,
2848 .o_getattr = osc_getattr,
2849 .o_setattr = osc_setattr,
2850 .o_iocontrol = osc_iocontrol,
2851 .o_set_info_async = osc_set_info_async,
2852 .o_import_event = osc_import_event,
2853 .o_process_config = osc_process_config,
2854 .o_quotactl = osc_quotactl,
2857 static int __init osc_init(void)
2859 bool enable_proc = true;
2860 struct obd_type *type;
2864 /* print an address of _any_ initialized kernel symbol from this
2865 * module, to allow debugging with gdb that doesn't support data
2866 * symbols from modules.*/
2867 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2869 rc = lu_kmem_init(osc_caches);
2873 type = class_search_type(LUSTRE_OSP_NAME);
2874 if (type != NULL && type->typ_procsym != NULL)
2875 enable_proc = false;
2877 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2878 LUSTRE_OSC_NAME, &osc_device_type);
2880 lu_kmem_fini(osc_caches);
2887 static void /*__exit*/ osc_exit(void)
2889 class_unregister_type(LUSTRE_OSC_NAME);
2890 lu_kmem_fini(osc_caches);
2893 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2894 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2895 MODULE_VERSION(LUSTRE_VERSION_STRING);
2896 MODULE_LICENSE("GPL");
2898 module_init(osc_init);
2899 module_exit(osc_exit);