Whamcloud - gitweb
LU-6017 obd: remove destroy cookie handling
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         u32                       aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_setattr_args {
72         struct obdo             *sa_oa;
73         obd_enqueue_update_f     sa_upcall;
74         void                    *sa_cookie;
75 };
76
77 struct osc_fsync_args {
78         struct obd_info *fa_oi;
79         obd_enqueue_update_f     fa_upcall;
80         void                    *fa_cookie;
81 };
82
83 struct osc_enqueue_args {
84         struct obd_export       *oa_exp;
85         ldlm_type_t             oa_type;
86         ldlm_mode_t             oa_mode;
87         __u64                   *oa_flags;
88         osc_enqueue_upcall_f    oa_upcall;
89         void                    *oa_cookie;
90         struct ost_lvb          *oa_lvb;
91         struct lustre_handle    oa_lockh;
92         unsigned int            oa_agl:1;
93 };
94
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
97                          void *data, int rc);
98
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100                                  struct ost_body *body, void *capa)
101 {
102         struct obd_capa *oc = (struct obd_capa *)capa;
103         struct lustre_capa *c;
104
105         if (!capa)
106                 return;
107
108         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
109         LASSERT(c);
110         capa_cpy(c, oc);
111         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112         DEBUG_CAPA(D_SEC, c, "pack");
113 }
114
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
116 {
117         struct ost_body *body;
118
119         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
120         LASSERT(body);
121
122         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
123                              oinfo->oi_oa);
124         osc_pack_capa(req, body, oinfo->oi_capa);
125 }
126
127 void osc_set_capa_size(struct ptlrpc_request *req,
128                        const struct req_msg_field *field,
129                        struct obd_capa *oc)
130 {
131         if (oc == NULL)
132                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
133         else
134                 /* it is already calculated as sizeof struct obd_capa */
135                 ;
136 }
137
138 int osc_getattr_interpret(const struct lu_env *env,
139                           struct ptlrpc_request *req,
140                           struct osc_async_args *aa, int rc)
141 {
142         struct ost_body *body;
143         ENTRY;
144
145         if (rc != 0)
146                 GOTO(out, rc);
147
148         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
149         if (body) {
150                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
152                                      aa->aa_oi->oi_oa, &body->oa);
153
154                 /* This should really be sent by the OST */
155                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
156                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
157         } else {
158                 CDEBUG(D_INFO, "can't unpack ost_body\n");
159                 rc = -EPROTO;
160                 aa->aa_oi->oi_oa->o_valid = 0;
161         }
162 out:
163         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
164         RETURN(rc);
165 }
166
167 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
168                        struct obd_info *oinfo)
169 {
170         struct ptlrpc_request *req;
171         struct ost_body       *body;
172         int                    rc;
173         ENTRY;
174
175         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
176         if (req == NULL)
177                 RETURN(-ENOMEM);
178
179         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
180         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
181         if (rc) {
182                 ptlrpc_request_free(req);
183                 RETURN(rc);
184         }
185
186         osc_pack_req_body(req, oinfo);
187
188         ptlrpc_request_set_replen(req);
189
190         rc = ptlrpc_queue_wait(req);
191         if (rc)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
200                              &body->oa);
201
202         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
203         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
204
205         EXIT;
206  out:
207         ptlrpc_req_finished(req);
208         return rc;
209 }
210
211 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
212                        struct obd_info *oinfo)
213 {
214         struct ptlrpc_request *req;
215         struct ost_body       *body;
216         int                    rc;
217         ENTRY;
218
219         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
220
221         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
222         if (req == NULL)
223                 RETURN(-ENOMEM);
224
225         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
226         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
227         if (rc) {
228                 ptlrpc_request_free(req);
229                 RETURN(rc);
230         }
231
232         osc_pack_req_body(req, oinfo);
233
234         ptlrpc_request_set_replen(req);
235
236         rc = ptlrpc_queue_wait(req);
237         if (rc)
238                 GOTO(out, rc);
239
240         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
241         if (body == NULL)
242                 GOTO(out, rc = -EPROTO);
243
244         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
245                              &body->oa);
246
247         EXIT;
248 out:
249         ptlrpc_req_finished(req);
250         RETURN(rc);
251 }
252
253 static int osc_setattr_interpret(const struct lu_env *env,
254                                  struct ptlrpc_request *req,
255                                  struct osc_setattr_args *sa, int rc)
256 {
257         struct ost_body *body;
258         ENTRY;
259
260         if (rc != 0)
261                 GOTO(out, rc);
262
263         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
264         if (body == NULL)
265                 GOTO(out, rc = -EPROTO);
266
267         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
268                              &body->oa);
269 out:
270         rc = sa->sa_upcall(sa->sa_cookie, rc);
271         RETURN(rc);
272 }
273
274 int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
275                       obd_enqueue_update_f upcall, void *cookie,
276                       struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct osc_setattr_args *sa;
280         int                      rc;
281         ENTRY;
282
283         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
284         if (req == NULL)
285                 RETURN(-ENOMEM);
286
287         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
288         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
289         if (rc) {
290                 ptlrpc_request_free(req);
291                 RETURN(rc);
292         }
293
294         osc_pack_req_body(req, oinfo);
295
296         ptlrpc_request_set_replen(req);
297
298         /* do mds to ost setattr asynchronously */
299         if (!rqset) {
300                 /* Do not wait for response. */
301                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
302         } else {
303                 req->rq_interpret_reply =
304                         (ptlrpc_interpterer_t)osc_setattr_interpret;
305
306                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
307                 sa = ptlrpc_req_async_args(req);
308                 sa->sa_oa = oinfo->oi_oa;
309                 sa->sa_upcall = upcall;
310                 sa->sa_cookie = cookie;
311
312                 if (rqset == PTLRPCD_SET)
313                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
314                 else
315                         ptlrpc_set_add_req(rqset, req);
316         }
317
318         RETURN(0);
319 }
320
321 static int osc_create(const struct lu_env *env, struct obd_export *exp,
322                       struct obdo *oa)
323 {
324         struct ptlrpc_request *req;
325         struct ost_body       *body;
326         int                    rc;
327         ENTRY;
328
329         LASSERT(oa != NULL);
330         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
331         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
332
333         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
334         if (req == NULL)
335                 GOTO(out, rc = -ENOMEM);
336
337         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
338         if (rc) {
339                 ptlrpc_request_free(req);
340                 GOTO(out, rc);
341         }
342
343         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
344         LASSERT(body);
345
346         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
347
348         ptlrpc_request_set_replen(req);
349
350         rc = ptlrpc_queue_wait(req);
351         if (rc)
352                 GOTO(out_req, rc);
353
354         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
355         if (body == NULL)
356                 GOTO(out_req, rc = -EPROTO);
357
358         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
359         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
360
361         oa->o_blksize = cli_brw_size(exp->exp_obd);
362         oa->o_valid |= OBD_MD_FLBLKSZ;
363
364         CDEBUG(D_HA, "transno: "LPD64"\n",
365                lustre_msg_get_transno(req->rq_repmsg));
366 out_req:
367         ptlrpc_req_finished(req);
368 out:
369         RETURN(rc);
370 }
371
372 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
373                    obd_enqueue_update_f upcall, void *cookie,
374                    struct ptlrpc_request_set *rqset)
375 {
376         struct ptlrpc_request   *req;
377         struct osc_setattr_args *sa;
378         struct ost_body         *body;
379         int                      rc;
380         ENTRY;
381
382         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
383         if (req == NULL)
384                 RETURN(-ENOMEM);
385
386         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
387         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
388         if (rc) {
389                 ptlrpc_request_free(req);
390                 RETURN(rc);
391         }
392         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
393         ptlrpc_at_set_req_timeout(req);
394
395         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
396         LASSERT(body);
397         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
398                              oinfo->oi_oa);
399         osc_pack_capa(req, body, oinfo->oi_capa);
400
401         ptlrpc_request_set_replen(req);
402
403         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
404         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405         sa = ptlrpc_req_async_args(req);
406         sa->sa_oa     = oinfo->oi_oa;
407         sa->sa_upcall = upcall;
408         sa->sa_cookie = cookie;
409         if (rqset == PTLRPCD_SET)
410                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
411         else
412                 ptlrpc_set_add_req(rqset, req);
413
414         RETURN(0);
415 }
416
417 static int osc_sync_interpret(const struct lu_env *env,
418                               struct ptlrpc_request *req,
419                               void *arg, int rc)
420 {
421         struct osc_fsync_args *fa = arg;
422         struct ost_body *body;
423         ENTRY;
424
425         if (rc)
426                 GOTO(out, rc);
427
428         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
429         if (body == NULL) {
430                 CERROR ("can't unpack ost_body\n");
431                 GOTO(out, rc = -EPROTO);
432         }
433
434         *fa->fa_oi->oi_oa = body->oa;
435 out:
436         rc = fa->fa_upcall(fa->fa_cookie, rc);
437         RETURN(rc);
438 }
439
440 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
441                   obd_enqueue_update_f upcall, void *cookie,
442                   struct ptlrpc_request_set *rqset)
443 {
444         struct ptlrpc_request *req;
445         struct ost_body       *body;
446         struct osc_fsync_args *fa;
447         int                    rc;
448         ENTRY;
449
450         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
451         if (req == NULL)
452                 RETURN(-ENOMEM);
453
454         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
455         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
456         if (rc) {
457                 ptlrpc_request_free(req);
458                 RETURN(rc);
459         }
460
461         /* overload the size and blocks fields in the oa with start/end */
462         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
463         LASSERT(body);
464         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
465                              oinfo->oi_oa);
466         osc_pack_capa(req, body, oinfo->oi_capa);
467
468         ptlrpc_request_set_replen(req);
469         req->rq_interpret_reply = osc_sync_interpret;
470
471         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
472         fa = ptlrpc_req_async_args(req);
473         fa->fa_oi = oinfo;
474         fa->fa_upcall = upcall;
475         fa->fa_cookie = cookie;
476
477         if (rqset == PTLRPCD_SET)
478                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
479         else
480                 ptlrpc_set_add_req(rqset, req);
481
482         RETURN (0);
483 }
484
485 /* Find and cancel locally locks matched by @mode in the resource found by
486  * @objid. Found locks are added into @cancel list. Returns the amount of
487  * locks added to @cancels list. */
488 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
489                                    struct list_head *cancels,
490                                    ldlm_mode_t mode, __u64 lock_flags)
491 {
492         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
493         struct ldlm_res_id res_id;
494         struct ldlm_resource *res;
495         int count;
496         ENTRY;
497
498         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
499          * export) but disabled through procfs (flag in NS).
500          *
501          * This distinguishes from a case when ELC is not supported originally,
502          * when we still want to cancel locks in advance and just cancel them
503          * locally, without sending any RPC. */
504         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
505                 RETURN(0);
506
507         ostid_build_res_name(&oa->o_oi, &res_id);
508         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
509         if (IS_ERR(res))
510                 RETURN(0);
511
512         LDLM_RESOURCE_ADDREF(res);
513         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
514                                            lock_flags, 0, NULL);
515         LDLM_RESOURCE_DELREF(res);
516         ldlm_resource_putref(res);
517         RETURN(count);
518 }
519
520 static int osc_destroy_interpret(const struct lu_env *env,
521                                  struct ptlrpc_request *req, void *data,
522                                  int rc)
523 {
524         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
525
526         atomic_dec(&cli->cl_destroy_in_flight);
527         wake_up(&cli->cl_destroy_waitq);
528         return 0;
529 }
530
531 static int osc_can_send_destroy(struct client_obd *cli)
532 {
533         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
534             cli->cl_max_rpcs_in_flight) {
535                 /* The destroy request can be sent */
536                 return 1;
537         }
538         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
539             cli->cl_max_rpcs_in_flight) {
540                 /*
541                  * The counter has been modified between the two atomic
542                  * operations.
543                  */
544                 wake_up(&cli->cl_destroy_waitq);
545         }
546         return 0;
547 }
548
549 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
550                        struct obdo *oa)
551 {
552         struct client_obd     *cli = &exp->exp_obd->u.cli;
553         struct ptlrpc_request *req;
554         struct ost_body       *body;
555         struct list_head       cancels = LIST_HEAD_INIT(cancels);
556         int rc, count;
557         ENTRY;
558
559         if (!oa) {
560                 CDEBUG(D_INFO, "oa NULL\n");
561                 RETURN(-EINVAL);
562         }
563
564         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
565                                         LDLM_FL_DISCARD_DATA);
566
567         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
568         if (req == NULL) {
569                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
570                 RETURN(-ENOMEM);
571         }
572
573         osc_set_capa_size(req, &RMF_CAPA1, NULL);
574         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
575                                0, &cancels, count);
576         if (rc) {
577                 ptlrpc_request_free(req);
578                 RETURN(rc);
579         }
580
581         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
582         ptlrpc_at_set_req_timeout(req);
583
584         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
585         LASSERT(body);
586         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
587
588         ptlrpc_request_set_replen(req);
589
590         req->rq_interpret_reply = osc_destroy_interpret;
591         if (!osc_can_send_destroy(cli)) {
592                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
593
594                 /*
595                  * Wait until the number of on-going destroy RPCs drops
596                  * under max_rpc_in_flight
597                  */
598                 l_wait_event_exclusive(cli->cl_destroy_waitq,
599                                        osc_can_send_destroy(cli), &lwi);
600         }
601
602         /* Do not wait for response */
603         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
604         RETURN(0);
605 }
606
607 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
608                                 long writing_bytes)
609 {
610         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
611
612         LASSERT(!(oa->o_valid & bits));
613
614         oa->o_valid |= bits;
615         spin_lock(&cli->cl_loi_list_lock);
616         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
617         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
618                      cli->cl_dirty_max_pages)) {
619                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
620                        cli->cl_dirty_pages, cli->cl_dirty_transit,
621                        cli->cl_dirty_max_pages);
622                 oa->o_undirty = 0;
623         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
624                             atomic_long_read(&obd_dirty_transit_pages) >
625                             (obd_max_dirty_pages + 1))) {
626                 /* The atomic_read() allowing the atomic_inc() are
627                  * not covered by a lock thus they may safely race and trip
628                  * this CERROR() unless we add in a small fudge factor (+1). */
629                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
630                        cli->cl_import->imp_obd->obd_name,
631                        atomic_long_read(&obd_dirty_pages),
632                        atomic_long_read(&obd_dirty_transit_pages),
633                        obd_max_dirty_pages);
634                 oa->o_undirty = 0;
635         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
636                             0x7fffffff)) {
637                 CERROR("dirty %lu - dirty_max %lu too big???\n",
638                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
639                 oa->o_undirty = 0;
640         } else {
641                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
642                                       PAGE_CACHE_SHIFT) *
643                                      (cli->cl_max_rpcs_in_flight + 1);
644                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
645                                     max_in_flight);
646         }
647         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
648         oa->o_dropped = cli->cl_lost_grant;
649         cli->cl_lost_grant = 0;
650         spin_unlock(&cli->cl_loi_list_lock);
651         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
652                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
653
654 }
655
656 void osc_update_next_shrink(struct client_obd *cli)
657 {
658         cli->cl_next_shrink_grant =
659                 cfs_time_shift(cli->cl_grant_shrink_interval);
660         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
661                cli->cl_next_shrink_grant);
662 }
663
664 static void __osc_update_grant(struct client_obd *cli, u64 grant)
665 {
666         spin_lock(&cli->cl_loi_list_lock);
667         cli->cl_avail_grant += grant;
668         spin_unlock(&cli->cl_loi_list_lock);
669 }
670
671 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
672 {
673         if (body->oa.o_valid & OBD_MD_FLGRANT) {
674                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
675                 __osc_update_grant(cli, body->oa.o_grant);
676         }
677 }
678
679 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
680                               u32 keylen, void *key,
681                               u32 vallen, void *val,
682                               struct ptlrpc_request_set *set);
683
684 static int osc_shrink_grant_interpret(const struct lu_env *env,
685                                       struct ptlrpc_request *req,
686                                       void *aa, int rc)
687 {
688         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
689         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
690         struct ost_body *body;
691
692         if (rc != 0) {
693                 __osc_update_grant(cli, oa->o_grant);
694                 GOTO(out, rc);
695         }
696
697         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
698         LASSERT(body);
699         osc_update_grant(cli, body);
700 out:
701         OBDO_FREE(oa);
702         return rc;
703 }
704
705 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
706 {
707         spin_lock(&cli->cl_loi_list_lock);
708         oa->o_grant = cli->cl_avail_grant / 4;
709         cli->cl_avail_grant -= oa->o_grant;
710         spin_unlock(&cli->cl_loi_list_lock);
711         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
712                 oa->o_valid |= OBD_MD_FLFLAGS;
713                 oa->o_flags = 0;
714         }
715         oa->o_flags |= OBD_FL_SHRINK_GRANT;
716         osc_update_next_shrink(cli);
717 }
718
719 /* Shrink the current grant, either from some large amount to enough for a
720  * full set of in-flight RPCs, or if we have already shrunk to that limit
721  * then to enough for a single RPC.  This avoids keeping more grant than
722  * needed, and avoids shrinking the grant piecemeal. */
723 static int osc_shrink_grant(struct client_obd *cli)
724 {
725         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
726                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
727
728         spin_lock(&cli->cl_loi_list_lock);
729         if (cli->cl_avail_grant <= target_bytes)
730                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
731         spin_unlock(&cli->cl_loi_list_lock);
732
733         return osc_shrink_grant_to_target(cli, target_bytes);
734 }
735
736 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
737 {
738         int                     rc = 0;
739         struct ost_body        *body;
740         ENTRY;
741
742         spin_lock(&cli->cl_loi_list_lock);
743         /* Don't shrink if we are already above or below the desired limit
744          * We don't want to shrink below a single RPC, as that will negatively
745          * impact block allocation and long-term performance. */
746         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
747                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
748
749         if (target_bytes >= cli->cl_avail_grant) {
750                 spin_unlock(&cli->cl_loi_list_lock);
751                 RETURN(0);
752         }
753         spin_unlock(&cli->cl_loi_list_lock);
754
755         OBD_ALLOC_PTR(body);
756         if (!body)
757                 RETURN(-ENOMEM);
758
759         osc_announce_cached(cli, &body->oa, 0);
760
761         spin_lock(&cli->cl_loi_list_lock);
762         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
763         cli->cl_avail_grant = target_bytes;
764         spin_unlock(&cli->cl_loi_list_lock);
765         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
766                 body->oa.o_valid |= OBD_MD_FLFLAGS;
767                 body->oa.o_flags = 0;
768         }
769         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
770         osc_update_next_shrink(cli);
771
772         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
773                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
774                                 sizeof(*body), body, NULL);
775         if (rc != 0)
776                 __osc_update_grant(cli, body->oa.o_grant);
777         OBD_FREE_PTR(body);
778         RETURN(rc);
779 }
780
781 static int osc_should_shrink_grant(struct client_obd *client)
782 {
783         cfs_time_t time = cfs_time_current();
784         cfs_time_t next_shrink = client->cl_next_shrink_grant;
785
786         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
787              OBD_CONNECT_GRANT_SHRINK) == 0)
788                 return 0;
789
790         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
791                 /* Get the current RPC size directly, instead of going via:
792                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
793                  * Keep comment here so that it can be found by searching. */
794                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
795
796                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
797                     client->cl_avail_grant > brw_size)
798                         return 1;
799                 else
800                         osc_update_next_shrink(client);
801         }
802         return 0;
803 }
804
805 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
806 {
807         struct client_obd *client;
808
809         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
810                 if (osc_should_shrink_grant(client))
811                         osc_shrink_grant(client);
812         }
813         return 0;
814 }
815
816 static int osc_add_shrink_grant(struct client_obd *client)
817 {
818         int rc;
819
820         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
821                                        TIMEOUT_GRANT,
822                                        osc_grant_shrink_grant_cb, NULL,
823                                        &client->cl_grant_shrink_list);
824         if (rc) {
825                 CERROR("add grant client %s error %d\n",
826                         client->cl_import->imp_obd->obd_name, rc);
827                 return rc;
828         }
829         CDEBUG(D_CACHE, "add grant client %s \n",
830                client->cl_import->imp_obd->obd_name);
831         osc_update_next_shrink(client);
832         return 0;
833 }
834
835 static int osc_del_shrink_grant(struct client_obd *client)
836 {
837         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
838                                          TIMEOUT_GRANT);
839 }
840
841 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
842 {
843         /*
844          * ocd_grant is the total grant amount we're expect to hold: if we've
845          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
846          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
847          * dirty.
848          *
849          * race is tolerable here: if we're evicted, but imp_state already
850          * left EVICTED state, then cl_dirty_pages must be 0 already.
851          */
852         spin_lock(&cli->cl_loi_list_lock);
853         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
854                 cli->cl_avail_grant = ocd->ocd_grant;
855         else
856                 cli->cl_avail_grant = ocd->ocd_grant -
857                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
858
859         if (cli->cl_avail_grant < 0) {
860                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
861                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
862                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
863                 /* workaround for servers which do not have the patch from
864                  * LU-2679 */
865                 cli->cl_avail_grant = ocd->ocd_grant;
866         }
867
868         /* determine the appropriate chunk size used by osc_extent. */
869         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
870         spin_unlock(&cli->cl_loi_list_lock);
871
872         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
873                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
874                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
875
876         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
877             list_empty(&cli->cl_grant_shrink_list))
878                 osc_add_shrink_grant(cli);
879 }
880
881 /* We assume that the reason this OSC got a short read is because it read
882  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
883  * via the LOV, and it _knows_ it's reading inside the file, it's just that
884  * this stripe never got written at or beyond this stripe offset yet. */
885 static void handle_short_read(int nob_read, size_t page_count,
886                               struct brw_page **pga)
887 {
888         char *ptr;
889         int i = 0;
890
891         /* skip bytes read OK */
892         while (nob_read > 0) {
893                 LASSERT (page_count > 0);
894
895                 if (pga[i]->count > nob_read) {
896                         /* EOF inside this page */
897                         ptr = kmap(pga[i]->pg) +
898                                 (pga[i]->off & ~PAGE_MASK);
899                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
900                         kunmap(pga[i]->pg);
901                         page_count--;
902                         i++;
903                         break;
904                 }
905
906                 nob_read -= pga[i]->count;
907                 page_count--;
908                 i++;
909         }
910
911         /* zero remaining pages */
912         while (page_count-- > 0) {
913                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
914                 memset(ptr, 0, pga[i]->count);
915                 kunmap(pga[i]->pg);
916                 i++;
917         }
918 }
919
920 static int check_write_rcs(struct ptlrpc_request *req,
921                            int requested_nob, int niocount,
922                            size_t page_count, struct brw_page **pga)
923 {
924         int     i;
925         __u32   *remote_rcs;
926
927         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
928                                                   sizeof(*remote_rcs) *
929                                                   niocount);
930         if (remote_rcs == NULL) {
931                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
932                 return(-EPROTO);
933         }
934
935         /* return error if any niobuf was in error */
936         for (i = 0; i < niocount; i++) {
937                 if ((int)remote_rcs[i] < 0)
938                         return(remote_rcs[i]);
939
940                 if (remote_rcs[i] != 0) {
941                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
942                                 i, remote_rcs[i], req);
943                         return(-EPROTO);
944                 }
945         }
946
947         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
948                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
949                        req->rq_bulk->bd_nob_transferred, requested_nob);
950                 return(-EPROTO);
951         }
952
953         return (0);
954 }
955
956 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
957 {
958         if (p1->flag != p2->flag) {
959                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
960                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
961                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
962
963                 /* warn if we try to combine flags that we don't know to be
964                  * safe to combine */
965                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
966                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
967                               "report this at https://jira.hpdd.intel.com/\n",
968                               p1->flag, p2->flag);
969                 }
970                 return 0;
971         }
972
973         return (p1->off + p1->count == p2->off);
974 }
975
976 static u32 osc_checksum_bulk(int nob, size_t pg_count,
977                              struct brw_page **pga, int opc,
978                              cksum_type_t cksum_type)
979 {
980         u32                             cksum;
981         int                             i = 0;
982         struct cfs_crypto_hash_desc     *hdesc;
983         unsigned int                    bufsize;
984         int                             err;
985         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
986
987         LASSERT(pg_count > 0);
988
989         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
990         if (IS_ERR(hdesc)) {
991                 CERROR("Unable to initialize checksum hash %s\n",
992                        cfs_crypto_hash_name(cfs_alg));
993                 return PTR_ERR(hdesc);
994         }
995
996         while (nob > 0 && pg_count > 0) {
997                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
998
999                 /* corrupt the data before we compute the checksum, to
1000                  * simulate an OST->client data error */
1001                 if (i == 0 && opc == OST_READ &&
1002                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1003                         unsigned char *ptr = kmap(pga[i]->pg);
1004                         int off = pga[i]->off & ~PAGE_MASK;
1005
1006                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1007                         kunmap(pga[i]->pg);
1008                 }
1009                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1010                                             pga[i]->off & ~PAGE_MASK,
1011                                             count);
1012                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1013                                (int)(pga[i]->off & ~PAGE_MASK));
1014
1015                 nob -= pga[i]->count;
1016                 pg_count--;
1017                 i++;
1018         }
1019
1020         bufsize = sizeof(cksum);
1021         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1022
1023         /* For sending we only compute the wrong checksum instead
1024          * of corrupting the data so it is still correct on a redo */
1025         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1026                 cksum++;
1027
1028         return cksum;
1029 }
1030
1031 static int
1032 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1033                      u32 page_count, struct brw_page **pga,
1034                      struct ptlrpc_request **reqp, struct obd_capa *ocapa,
1035                      int reserve, int resend)
1036 {
1037         struct ptlrpc_request   *req;
1038         struct ptlrpc_bulk_desc *desc;
1039         struct ost_body         *body;
1040         struct obd_ioobj        *ioobj;
1041         struct niobuf_remote    *niobuf;
1042         int niocount, i, requested_nob, opc, rc;
1043         struct osc_brw_async_args *aa;
1044         struct req_capsule      *pill;
1045         struct brw_page *pg_prev;
1046
1047         ENTRY;
1048         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1049                 RETURN(-ENOMEM); /* Recoverable */
1050         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1051                 RETURN(-EINVAL); /* Fatal */
1052
1053         if ((cmd & OBD_BRW_WRITE) != 0) {
1054                 opc = OST_WRITE;
1055                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1056                                                 cli->cl_import->imp_rq_pool,
1057                                                 &RQF_OST_BRW_WRITE);
1058         } else {
1059                 opc = OST_READ;
1060                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1061         }
1062         if (req == NULL)
1063                 RETURN(-ENOMEM);
1064
1065         for (niocount = i = 1; i < page_count; i++) {
1066                 if (!can_merge_pages(pga[i - 1], pga[i]))
1067                         niocount++;
1068         }
1069
1070         pill = &req->rq_pill;
1071         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1072                              sizeof(*ioobj));
1073         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1074                              niocount * sizeof(*niobuf));
1075         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1076
1077         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1078         if (rc) {
1079                 ptlrpc_request_free(req);
1080                 RETURN(rc);
1081         }
1082         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1083         ptlrpc_at_set_req_timeout(req);
1084         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1085          * retry logic */
1086         req->rq_no_retry_einprogress = 1;
1087
1088         desc = ptlrpc_prep_bulk_imp(req, page_count,
1089                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1090                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1091                 OST_BULK_PORTAL);
1092
1093         if (desc == NULL)
1094                 GOTO(out, rc = -ENOMEM);
1095         /* NB request now owns desc and will free it when it gets freed */
1096
1097         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1098         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1099         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1100         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1101
1102         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1103
1104         obdo_to_ioobj(oa, ioobj);
1105         ioobj->ioo_bufcnt = niocount;
1106         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1107          * that might be send for this request.  The actual number is decided
1108          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1109          * "max - 1" for old client compatibility sending "0", and also so the
1110          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1111         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1112         osc_pack_capa(req, body, ocapa);
1113         LASSERT(page_count > 0);
1114         pg_prev = pga[0];
1115         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1116                 struct brw_page *pg = pga[i];
1117                 int poff = pg->off & ~PAGE_MASK;
1118
1119                 LASSERT(pg->count > 0);
1120                 /* make sure there is no gap in the middle of page array */
1121                 LASSERTF(page_count == 1 ||
1122                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1123                           ergo(i > 0 && i < page_count - 1,
1124                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1125                           ergo(i == page_count - 1, poff == 0)),
1126                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1127                          i, page_count, pg, pg->off, pg->count);
1128                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1129                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1130                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1131                          i, page_count,
1132                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1133                          pg_prev->pg, page_private(pg_prev->pg),
1134                          pg_prev->pg->index, pg_prev->off);
1135                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1136                         (pg->flag & OBD_BRW_SRVLOCK));
1137
1138                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1139                 requested_nob += pg->count;
1140
1141                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1142                         niobuf--;
1143                         niobuf->rnb_len += pg->count;
1144                 } else {
1145                         niobuf->rnb_offset = pg->off;
1146                         niobuf->rnb_len    = pg->count;
1147                         niobuf->rnb_flags  = pg->flag;
1148                 }
1149                 pg_prev = pg;
1150         }
1151
1152         LASSERTF((void *)(niobuf - niocount) ==
1153                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1154                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1155                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1156
1157         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1158         if (resend) {
1159                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1160                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1161                         body->oa.o_flags = 0;
1162                 }
1163                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1164         }
1165
1166         if (osc_should_shrink_grant(cli))
1167                 osc_shrink_grant_local(cli, &body->oa);
1168
1169         /* size[REQ_REC_OFF] still sizeof (*body) */
1170         if (opc == OST_WRITE) {
1171                 if (cli->cl_checksum &&
1172                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1173                         /* store cl_cksum_type in a local variable since
1174                          * it can be changed via lprocfs */
1175                         cksum_type_t cksum_type = cli->cl_cksum_type;
1176
1177                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1178                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1179                                 body->oa.o_flags = 0;
1180                         }
1181                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1182                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1183                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1184                                                              page_count, pga,
1185                                                              OST_WRITE,
1186                                                              cksum_type);
1187                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1188                                body->oa.o_cksum);
1189                         /* save this in 'oa', too, for later checking */
1190                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1191                         oa->o_flags |= cksum_type_pack(cksum_type);
1192                 } else {
1193                         /* clear out the checksum flag, in case this is a
1194                          * resend but cl_checksum is no longer set. b=11238 */
1195                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1196                 }
1197                 oa->o_cksum = body->oa.o_cksum;
1198                 /* 1 RC per niobuf */
1199                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1200                                      sizeof(__u32) * niocount);
1201         } else {
1202                 if (cli->cl_checksum &&
1203                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1204                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1205                                 body->oa.o_flags = 0;
1206                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1207                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1208                 }
1209         }
1210         ptlrpc_request_set_replen(req);
1211
1212         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1213         aa = ptlrpc_req_async_args(req);
1214         aa->aa_oa = oa;
1215         aa->aa_requested_nob = requested_nob;
1216         aa->aa_nio_count = niocount;
1217         aa->aa_page_count = page_count;
1218         aa->aa_resends = 0;
1219         aa->aa_ppga = pga;
1220         aa->aa_cli = cli;
1221         INIT_LIST_HEAD(&aa->aa_oaps);
1222         if (ocapa && reserve)
1223                 aa->aa_ocapa = capa_get(ocapa);
1224
1225         *reqp = req;
1226         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1227         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1228                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1229                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1230         RETURN(0);
1231
1232  out:
1233         ptlrpc_req_finished(req);
1234         RETURN(rc);
1235 }
1236
1237 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1238                                 __u32 client_cksum, __u32 server_cksum, int nob,
1239                                 size_t page_count, struct brw_page **pga,
1240                                 cksum_type_t client_cksum_type)
1241 {
1242         __u32 new_cksum;
1243         char *msg;
1244         cksum_type_t cksum_type;
1245
1246         if (server_cksum == client_cksum) {
1247                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1248                 return 0;
1249         }
1250
1251         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1252                                        oa->o_flags : 0);
1253         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1254                                       cksum_type);
1255
1256         if (cksum_type != client_cksum_type)
1257                 msg = "the server did not use the checksum type specified in "
1258                       "the original request - likely a protocol problem";
1259         else if (new_cksum == server_cksum)
1260                 msg = "changed on the client after we checksummed it - "
1261                       "likely false positive due to mmap IO (bug 11742)";
1262         else if (new_cksum == client_cksum)
1263                 msg = "changed in transit before arrival at OST";
1264         else
1265                 msg = "changed in transit AND doesn't match the original - "
1266                       "likely false positive due to mmap IO (bug 11742)";
1267
1268         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1269                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1270                            msg, libcfs_nid2str(peer->nid),
1271                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1272                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1273                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1274                            POSTID(&oa->o_oi), pga[0]->off,
1275                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1276         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1277                "client csum now %x\n", client_cksum, client_cksum_type,
1278                server_cksum, cksum_type, new_cksum);
1279         return 1;
1280 }
1281
1282 /* Note rc enters this function as number of bytes transferred */
1283 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1284 {
1285         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1286         const lnet_process_id_t *peer =
1287                         &req->rq_import->imp_connection->c_peer;
1288         struct client_obd *cli = aa->aa_cli;
1289         struct ost_body *body;
1290         u32 client_cksum = 0;
1291         ENTRY;
1292
1293         if (rc < 0 && rc != -EDQUOT) {
1294                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1295                 RETURN(rc);
1296         }
1297
1298         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1299         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1300         if (body == NULL) {
1301                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1302                 RETURN(-EPROTO);
1303         }
1304
1305         /* set/clear over quota flag for a uid/gid */
1306         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1307             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1308                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1309
1310                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1311                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1312                        body->oa.o_flags);
1313                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1314         }
1315
1316         osc_update_grant(cli, body);
1317
1318         if (rc < 0)
1319                 RETURN(rc);
1320
1321         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1322                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1323
1324         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1325                 if (rc > 0) {
1326                         CERROR("Unexpected +ve rc %d\n", rc);
1327                         RETURN(-EPROTO);
1328                 }
1329                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1330
1331                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1332                         RETURN(-EAGAIN);
1333
1334                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1335                     check_write_checksum(&body->oa, peer, client_cksum,
1336                                          body->oa.o_cksum, aa->aa_requested_nob,
1337                                          aa->aa_page_count, aa->aa_ppga,
1338                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1339                         RETURN(-EAGAIN);
1340
1341                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1342                                      aa->aa_page_count, aa->aa_ppga);
1343                 GOTO(out, rc);
1344         }
1345
1346         /* The rest of this function executes only for OST_READs */
1347
1348         /* if unwrap_bulk failed, return -EAGAIN to retry */
1349         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1350         if (rc < 0)
1351                 GOTO(out, rc = -EAGAIN);
1352
1353         if (rc > aa->aa_requested_nob) {
1354                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1355                        aa->aa_requested_nob);
1356                 RETURN(-EPROTO);
1357         }
1358
1359         if (rc != req->rq_bulk->bd_nob_transferred) {
1360                 CERROR ("Unexpected rc %d (%d transferred)\n",
1361                         rc, req->rq_bulk->bd_nob_transferred);
1362                 return (-EPROTO);
1363         }
1364
1365         if (rc < aa->aa_requested_nob)
1366                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1367
1368         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1369                 static int cksum_counter;
1370                 u32        server_cksum = body->oa.o_cksum;
1371                 char      *via = "";
1372                 char      *router = "";
1373                 cksum_type_t cksum_type;
1374
1375                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1376                                                body->oa.o_flags : 0);
1377                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1378                                                  aa->aa_ppga, OST_READ,
1379                                                  cksum_type);
1380
1381                 if (peer->nid != req->rq_bulk->bd_sender) {
1382                         via = " via ";
1383                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1384                 }
1385
1386                 if (server_cksum != client_cksum) {
1387                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1388                                            "%s%s%s inode "DFID" object "DOSTID
1389                                            " extent ["LPU64"-"LPU64"]\n",
1390                                            req->rq_import->imp_obd->obd_name,
1391                                            libcfs_nid2str(peer->nid),
1392                                            via, router,
1393                                            body->oa.o_valid & OBD_MD_FLFID ?
1394                                                 body->oa.o_parent_seq : (__u64)0,
1395                                            body->oa.o_valid & OBD_MD_FLFID ?
1396                                                 body->oa.o_parent_oid : 0,
1397                                            body->oa.o_valid & OBD_MD_FLFID ?
1398                                                 body->oa.o_parent_ver : 0,
1399                                            POSTID(&body->oa.o_oi),
1400                                            aa->aa_ppga[0]->off,
1401                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1402                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1403                                                                         1);
1404                         CERROR("client %x, server %x, cksum_type %x\n",
1405                                client_cksum, server_cksum, cksum_type);
1406                         cksum_counter = 0;
1407                         aa->aa_oa->o_cksum = client_cksum;
1408                         rc = -EAGAIN;
1409                 } else {
1410                         cksum_counter++;
1411                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1412                         rc = 0;
1413                 }
1414         } else if (unlikely(client_cksum)) {
1415                 static int cksum_missed;
1416
1417                 cksum_missed++;
1418                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1419                         CERROR("Checksum %u requested from %s but not sent\n",
1420                                cksum_missed, libcfs_nid2str(peer->nid));
1421         } else {
1422                 rc = 0;
1423         }
1424 out:
1425         if (rc >= 0)
1426                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1427                                      aa->aa_oa, &body->oa);
1428
1429         RETURN(rc);
1430 }
1431
1432 static int osc_brw_redo_request(struct ptlrpc_request *request,
1433                                 struct osc_brw_async_args *aa, int rc)
1434 {
1435         struct ptlrpc_request *new_req;
1436         struct osc_brw_async_args *new_aa;
1437         struct osc_async_page *oap;
1438         ENTRY;
1439
1440         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1441                   "redo for recoverable error %d", rc);
1442
1443         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1444                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1445                                   aa->aa_cli, aa->aa_oa,
1446                                   aa->aa_page_count, aa->aa_ppga,
1447                                   &new_req, aa->aa_ocapa, 0, 1);
1448         if (rc)
1449                 RETURN(rc);
1450
1451         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1452                 if (oap->oap_request != NULL) {
1453                         LASSERTF(request == oap->oap_request,
1454                                  "request %p != oap_request %p\n",
1455                                  request, oap->oap_request);
1456                         if (oap->oap_interrupted) {
1457                                 ptlrpc_req_finished(new_req);
1458                                 RETURN(-EINTR);
1459                         }
1460                 }
1461         }
1462         /* New request takes over pga and oaps from old request.
1463          * Note that copying a list_head doesn't work, need to move it... */
1464         aa->aa_resends++;
1465         new_req->rq_interpret_reply = request->rq_interpret_reply;
1466         new_req->rq_async_args = request->rq_async_args;
1467         new_req->rq_commit_cb = request->rq_commit_cb;
1468         /* cap resend delay to the current request timeout, this is similar to
1469          * what ptlrpc does (see after_reply()) */
1470         if (aa->aa_resends > new_req->rq_timeout)
1471                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1472         else
1473                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1474         new_req->rq_generation_set = 1;
1475         new_req->rq_import_generation = request->rq_import_generation;
1476
1477         new_aa = ptlrpc_req_async_args(new_req);
1478
1479         INIT_LIST_HEAD(&new_aa->aa_oaps);
1480         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1481         INIT_LIST_HEAD(&new_aa->aa_exts);
1482         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1483         new_aa->aa_resends = aa->aa_resends;
1484
1485         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1486                 if (oap->oap_request) {
1487                         ptlrpc_req_finished(oap->oap_request);
1488                         oap->oap_request = ptlrpc_request_addref(new_req);
1489                 }
1490         }
1491
1492         new_aa->aa_ocapa = aa->aa_ocapa;
1493         aa->aa_ocapa = NULL;
1494
1495         /* XXX: This code will run into problem if we're going to support
1496          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1497          * and wait for all of them to be finished. We should inherit request
1498          * set from old request. */
1499         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1500
1501         DEBUG_REQ(D_INFO, new_req, "new request");
1502         RETURN(0);
1503 }
1504
1505 /*
1506  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1507  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1508  * fine for our small page arrays and doesn't require allocation.  its an
1509  * insertion sort that swaps elements that are strides apart, shrinking the
1510  * stride down until its '1' and the array is sorted.
1511  */
1512 static void sort_brw_pages(struct brw_page **array, int num)
1513 {
1514         int stride, i, j;
1515         struct brw_page *tmp;
1516
1517         if (num == 1)
1518                 return;
1519         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1520                 ;
1521
1522         do {
1523                 stride /= 3;
1524                 for (i = stride ; i < num ; i++) {
1525                         tmp = array[i];
1526                         j = i;
1527                         while (j >= stride && array[j - stride]->off > tmp->off) {
1528                                 array[j] = array[j - stride];
1529                                 j -= stride;
1530                         }
1531                         array[j] = tmp;
1532                 }
1533         } while (stride > 1);
1534 }
1535
1536 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1537 {
1538         LASSERT(ppga != NULL);
1539         OBD_FREE(ppga, sizeof(*ppga) * count);
1540 }
1541
1542 static int brw_interpret(const struct lu_env *env,
1543                          struct ptlrpc_request *req, void *data, int rc)
1544 {
1545         struct osc_brw_async_args *aa = data;
1546         struct osc_extent *ext;
1547         struct osc_extent *tmp;
1548         struct client_obd *cli = aa->aa_cli;
1549         ENTRY;
1550
1551         rc = osc_brw_fini_request(req, rc);
1552         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1553         /* When server return -EINPROGRESS, client should always retry
1554          * regardless of the number of times the bulk was resent already. */
1555         if (osc_recoverable_error(rc)) {
1556                 if (req->rq_import_generation !=
1557                     req->rq_import->imp_generation) {
1558                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1559                                ""DOSTID", rc = %d.\n",
1560                                req->rq_import->imp_obd->obd_name,
1561                                POSTID(&aa->aa_oa->o_oi), rc);
1562                 } else if (rc == -EINPROGRESS ||
1563                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1564                         rc = osc_brw_redo_request(req, aa, rc);
1565                 } else {
1566                         CERROR("%s: too many resent retries for object: "
1567                                ""LPU64":"LPU64", rc = %d.\n",
1568                                req->rq_import->imp_obd->obd_name,
1569                                POSTID(&aa->aa_oa->o_oi), rc);
1570                 }
1571
1572                 if (rc == 0)
1573                         RETURN(0);
1574                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1575                         rc = -EIO;
1576         }
1577
1578         if (aa->aa_ocapa) {
1579                 capa_put(aa->aa_ocapa);
1580                 aa->aa_ocapa = NULL;
1581         }
1582
1583         if (rc == 0) {
1584                 struct obdo *oa = aa->aa_oa;
1585                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1586                 unsigned long valid = 0;
1587                 struct cl_object *obj;
1588                 struct osc_async_page *last;
1589
1590                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1591                 obj = osc2cl(last->oap_obj);
1592
1593                 cl_object_attr_lock(obj);
1594                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1595                         attr->cat_blocks = oa->o_blocks;
1596                         valid |= CAT_BLOCKS;
1597                 }
1598                 if (oa->o_valid & OBD_MD_FLMTIME) {
1599                         attr->cat_mtime = oa->o_mtime;
1600                         valid |= CAT_MTIME;
1601                 }
1602                 if (oa->o_valid & OBD_MD_FLATIME) {
1603                         attr->cat_atime = oa->o_atime;
1604                         valid |= CAT_ATIME;
1605                 }
1606                 if (oa->o_valid & OBD_MD_FLCTIME) {
1607                         attr->cat_ctime = oa->o_ctime;
1608                         valid |= CAT_CTIME;
1609                 }
1610
1611                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1612                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1613                         loff_t last_off = last->oap_count + last->oap_obj_off +
1614                                 last->oap_page_off;
1615
1616                         /* Change file size if this is an out of quota or
1617                          * direct IO write and it extends the file size */
1618                         if (loi->loi_lvb.lvb_size < last_off) {
1619                                 attr->cat_size = last_off;
1620                                 valid |= CAT_SIZE;
1621                         }
1622                         /* Extend KMS if it's not a lockless write */
1623                         if (loi->loi_kms < last_off &&
1624                             oap2osc_page(last)->ops_srvlock == 0) {
1625                                 attr->cat_kms = last_off;
1626                                 valid |= CAT_KMS;
1627                         }
1628                 }
1629
1630                 if (valid != 0)
1631                         cl_object_attr_update(env, obj, attr, valid);
1632                 cl_object_attr_unlock(obj);
1633         }
1634         OBDO_FREE(aa->aa_oa);
1635
1636         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1637                 osc_inc_unstable_pages(req);
1638
1639         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1640                 list_del_init(&ext->oe_link);
1641                 osc_extent_finish(env, ext, 1, rc);
1642         }
1643         LASSERT(list_empty(&aa->aa_exts));
1644         LASSERT(list_empty(&aa->aa_oaps));
1645
1646         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1647                           req->rq_bulk->bd_nob_transferred);
1648         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1649         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1650
1651         spin_lock(&cli->cl_loi_list_lock);
1652         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1653          * is called so we know whether to go to sync BRWs or wait for more
1654          * RPCs to complete */
1655         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1656                 cli->cl_w_in_flight--;
1657         else
1658                 cli->cl_r_in_flight--;
1659         osc_wake_cache_waiters(cli);
1660         spin_unlock(&cli->cl_loi_list_lock);
1661
1662         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1663         RETURN(rc);
1664 }
1665
1666 static void brw_commit(struct ptlrpc_request *req)
1667 {
1668         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1669          * this called via the rq_commit_cb, I need to ensure
1670          * osc_dec_unstable_pages is still called. Otherwise unstable
1671          * pages may be leaked. */
1672         spin_lock(&req->rq_lock);
1673         if (likely(req->rq_unstable)) {
1674                 req->rq_unstable = 0;
1675                 spin_unlock(&req->rq_lock);
1676
1677                 osc_dec_unstable_pages(req);
1678         } else {
1679                 req->rq_committed = 1;
1680                 spin_unlock(&req->rq_lock);
1681         }
1682 }
1683
1684 /**
1685  * Build an RPC by the list of extent @ext_list. The caller must ensure
1686  * that the total pages in this list are NOT over max pages per RPC.
1687  * Extents in the list must be in OES_RPC state.
1688  */
1689 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1690                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1691 {
1692         struct ptlrpc_request           *req = NULL;
1693         struct osc_extent               *ext;
1694         struct brw_page                 **pga = NULL;
1695         struct osc_brw_async_args       *aa = NULL;
1696         struct obdo                     *oa = NULL;
1697         struct osc_async_page           *oap;
1698         struct osc_async_page           *tmp;
1699         struct cl_req                   *clerq = NULL;
1700         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1701                                                                       CRT_READ;
1702         struct cl_req_attr              *crattr = NULL;
1703         loff_t                          starting_offset = OBD_OBJECT_EOF;
1704         loff_t                          ending_offset = 0;
1705         int                             mpflag = 0;
1706         int                             mem_tight = 0;
1707         int                             page_count = 0;
1708         bool                            soft_sync = false;
1709         int                             i;
1710         int                             rc;
1711         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1712         struct ost_body                 *body;
1713         ENTRY;
1714         LASSERT(!list_empty(ext_list));
1715
1716         /* add pages into rpc_list to build BRW rpc */
1717         list_for_each_entry(ext, ext_list, oe_link) {
1718                 LASSERT(ext->oe_state == OES_RPC);
1719                 mem_tight |= ext->oe_memalloc;
1720                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1721                         ++page_count;
1722                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1723                         if (starting_offset == OBD_OBJECT_EOF ||
1724                             starting_offset > oap->oap_obj_off)
1725                                 starting_offset = oap->oap_obj_off;
1726                         else
1727                                 LASSERT(oap->oap_page_off == 0);
1728                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1729                                 ending_offset = oap->oap_obj_off +
1730                                                 oap->oap_count;
1731                         else
1732                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1733                                         PAGE_CACHE_SIZE);
1734                 }
1735         }
1736
1737         soft_sync = osc_over_unstable_soft_limit(cli);
1738         if (mem_tight)
1739                 mpflag = cfs_memory_pressure_get_and_set();
1740
1741         OBD_ALLOC(crattr, sizeof(*crattr));
1742         if (crattr == NULL)
1743                 GOTO(out, rc = -ENOMEM);
1744
1745         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1746         if (pga == NULL)
1747                 GOTO(out, rc = -ENOMEM);
1748
1749         OBDO_ALLOC(oa);
1750         if (oa == NULL)
1751                 GOTO(out, rc = -ENOMEM);
1752
1753         i = 0;
1754         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1755                 struct cl_page *page = oap2cl_page(oap);
1756                 if (clerq == NULL) {
1757                         clerq = cl_req_alloc(env, page, crt,
1758                                              1 /* only 1-object rpcs for now */);
1759                         if (IS_ERR(clerq))
1760                                 GOTO(out, rc = PTR_ERR(clerq));
1761                 }
1762                 if (mem_tight)
1763                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1764                 if (soft_sync)
1765                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1766                 pga[i] = &oap->oap_brw_page;
1767                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1768                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1769                        pga[i]->pg, page_index(oap->oap_page), oap,
1770                        pga[i]->flag);
1771                 i++;
1772                 cl_req_page_add(env, clerq, page);
1773         }
1774
1775         /* always get the data for the obdo for the rpc */
1776         LASSERT(clerq != NULL);
1777         crattr->cra_oa = oa;
1778         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1779
1780         rc = cl_req_prep(env, clerq);
1781         if (rc != 0) {
1782                 CERROR("cl_req_prep failed: %d\n", rc);
1783                 GOTO(out, rc);
1784         }
1785
1786         sort_brw_pages(pga, page_count);
1787         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req,
1788                                   crattr->cra_capa, 1, 0);
1789         if (rc != 0) {
1790                 CERROR("prep_req failed: %d\n", rc);
1791                 GOTO(out, rc);
1792         }
1793
1794         req->rq_commit_cb = brw_commit;
1795         req->rq_interpret_reply = brw_interpret;
1796
1797         if (mem_tight != 0)
1798                 req->rq_memalloc = 1;
1799
1800         /* Need to update the timestamps after the request is built in case
1801          * we race with setattr (locally or in queue at OST).  If OST gets
1802          * later setattr before earlier BRW (as determined by the request xid),
1803          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1804          * way to do this in a single call.  bug 10150 */
1805         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1806         crattr->cra_oa = &body->oa;
1807         cl_req_attr_set(env, clerq, crattr,
1808                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1809
1810         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1811
1812         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1813         aa = ptlrpc_req_async_args(req);
1814         INIT_LIST_HEAD(&aa->aa_oaps);
1815         list_splice_init(&rpc_list, &aa->aa_oaps);
1816         INIT_LIST_HEAD(&aa->aa_exts);
1817         list_splice_init(ext_list, &aa->aa_exts);
1818         aa->aa_clerq = clerq;
1819
1820         /* queued sync pages can be torn down while the pages
1821          * were between the pending list and the rpc */
1822         tmp = NULL;
1823         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1824                 /* only one oap gets a request reference */
1825                 if (tmp == NULL)
1826                         tmp = oap;
1827                 if (oap->oap_interrupted && !req->rq_intr) {
1828                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1829                                         oap, req);
1830                         ptlrpc_mark_interrupted(req);
1831                 }
1832         }
1833         if (tmp != NULL)
1834                 tmp->oap_request = ptlrpc_request_addref(req);
1835
1836         spin_lock(&cli->cl_loi_list_lock);
1837         starting_offset >>= PAGE_CACHE_SHIFT;
1838         if (cmd == OBD_BRW_READ) {
1839                 cli->cl_r_in_flight++;
1840                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1841                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1842                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1843                                       starting_offset + 1);
1844         } else {
1845                 cli->cl_w_in_flight++;
1846                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1847                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1848                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1849                                       starting_offset + 1);
1850         }
1851         spin_unlock(&cli->cl_loi_list_lock);
1852
1853         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1854                   page_count, aa, cli->cl_r_in_flight,
1855                   cli->cl_w_in_flight);
1856
1857         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1858          * see which CPU/NUMA node the majority of pages were allocated
1859          * on, and try to assign the async RPC to the CPU core
1860          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1861          *
1862          * But on the other hand, we expect that multiple ptlrpcd
1863          * threads and the initial write sponsor can run in parallel,
1864          * especially when data checksum is enabled, which is CPU-bound
1865          * operation and single ptlrpcd thread cannot process in time.
1866          * So more ptlrpcd threads sharing BRW load
1867          * (with PDL_POLICY_ROUND) seems better.
1868          */
1869         ptlrpcd_add_req(req, pol, -1);
1870         rc = 0;
1871         EXIT;
1872
1873 out:
1874         if (mem_tight != 0)
1875                 cfs_memory_pressure_restore(mpflag);
1876
1877         if (crattr != NULL) {
1878                 capa_put(crattr->cra_capa);
1879                 OBD_FREE(crattr, sizeof(*crattr));
1880         }
1881
1882         if (rc != 0) {
1883                 LASSERT(req == NULL);
1884
1885                 if (oa)
1886                         OBDO_FREE(oa);
1887                 if (pga)
1888                         OBD_FREE(pga, sizeof(*pga) * page_count);
1889                 /* this should happen rarely and is pretty bad, it makes the
1890                  * pending list not follow the dirty order */
1891                 while (!list_empty(ext_list)) {
1892                         ext = list_entry(ext_list->next, struct osc_extent,
1893                                          oe_link);
1894                         list_del_init(&ext->oe_link);
1895                         osc_extent_finish(env, ext, 0, rc);
1896                 }
1897                 if (clerq && !IS_ERR(clerq))
1898                         cl_req_completion(env, clerq, rc);
1899         }
1900         RETURN(rc);
1901 }
1902
1903 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1904                                         struct ldlm_enqueue_info *einfo)
1905 {
1906         void *data = einfo->ei_cbdata;
1907         int set = 0;
1908
1909         LASSERT(lock != NULL);
1910         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1911         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1912         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1913         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1914
1915         lock_res_and_lock(lock);
1916
1917         if (lock->l_ast_data == NULL)
1918                 lock->l_ast_data = data;
1919         if (lock->l_ast_data == data)
1920                 set = 1;
1921
1922         unlock_res_and_lock(lock);
1923
1924         return set;
1925 }
1926
1927 static int osc_set_data_with_check(struct lustre_handle *lockh,
1928                                    struct ldlm_enqueue_info *einfo)
1929 {
1930         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1931         int set = 0;
1932
1933         if (lock != NULL) {
1934                 set = osc_set_lock_data_with_check(lock, einfo);
1935                 LDLM_LOCK_PUT(lock);
1936         } else
1937                 CERROR("lockh %p, data %p - client evicted?\n",
1938                        lockh, einfo->ei_cbdata);
1939         return set;
1940 }
1941
1942 static int osc_enqueue_fini(struct ptlrpc_request *req,
1943                             osc_enqueue_upcall_f upcall, void *cookie,
1944                             struct lustre_handle *lockh, ldlm_mode_t mode,
1945                             __u64 *flags, int agl, int errcode)
1946 {
1947         bool intent = *flags & LDLM_FL_HAS_INTENT;
1948         int rc;
1949         ENTRY;
1950
1951         /* The request was created before ldlm_cli_enqueue call. */
1952         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1953                 struct ldlm_reply *rep;
1954
1955                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1956                 LASSERT(rep != NULL);
1957
1958                 rep->lock_policy_res1 =
1959                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1960                 if (rep->lock_policy_res1)
1961                         errcode = rep->lock_policy_res1;
1962                 if (!agl)
1963                         *flags |= LDLM_FL_LVB_READY;
1964         } else if (errcode == ELDLM_OK) {
1965                 *flags |= LDLM_FL_LVB_READY;
1966         }
1967
1968         /* Call the update callback. */
1969         rc = (*upcall)(cookie, lockh, errcode);
1970
1971         /* release the reference taken in ldlm_cli_enqueue() */
1972         if (errcode == ELDLM_LOCK_MATCHED)
1973                 errcode = ELDLM_OK;
1974         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
1975                 ldlm_lock_decref(lockh, mode);
1976
1977         RETURN(rc);
1978 }
1979
1980 static int osc_enqueue_interpret(const struct lu_env *env,
1981                                  struct ptlrpc_request *req,
1982                                  struct osc_enqueue_args *aa, int rc)
1983 {
1984         struct ldlm_lock *lock;
1985         struct lustre_handle *lockh = &aa->oa_lockh;
1986         ldlm_mode_t mode = aa->oa_mode;
1987         struct ost_lvb *lvb = aa->oa_lvb;
1988         __u32 lvb_len = sizeof(*lvb);
1989         __u64 flags = 0;
1990
1991         ENTRY;
1992
1993         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
1994          * be valid. */
1995         lock = ldlm_handle2lock(lockh);
1996         LASSERTF(lock != NULL,
1997                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
1998                  lockh->cookie, req, aa);
1999
2000         /* Take an additional reference so that a blocking AST that
2001          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2002          * to arrive after an upcall has been executed by
2003          * osc_enqueue_fini(). */
2004         ldlm_lock_addref(lockh, mode);
2005
2006         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2007         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2008
2009         /* Let CP AST to grant the lock first. */
2010         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2011
2012         if (aa->oa_agl) {
2013                 LASSERT(aa->oa_lvb == NULL);
2014                 LASSERT(aa->oa_flags == NULL);
2015                 aa->oa_flags = &flags;
2016         }
2017
2018         /* Complete obtaining the lock procedure. */
2019         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2020                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2021                                    lockh, rc);
2022         /* Complete osc stuff. */
2023         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2024                               aa->oa_flags, aa->oa_agl, rc);
2025
2026         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2027
2028         ldlm_lock_decref(lockh, mode);
2029         LDLM_LOCK_PUT(lock);
2030         RETURN(rc);
2031 }
2032
2033 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2034
2035 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2036  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2037  * other synchronous requests, however keeping some locks and trying to obtain
2038  * others may take a considerable amount of time in a case of ost failure; and
2039  * when other sync requests do not get released lock from a client, the client
2040  * is evicted from the cluster -- such scenarious make the life difficult, so
2041  * release locks just after they are obtained. */
2042 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2043                      __u64 *flags, ldlm_policy_data_t *policy,
2044                      struct ost_lvb *lvb, int kms_valid,
2045                      osc_enqueue_upcall_f upcall, void *cookie,
2046                      struct ldlm_enqueue_info *einfo,
2047                      struct ptlrpc_request_set *rqset, int async, int agl)
2048 {
2049         struct obd_device *obd = exp->exp_obd;
2050         struct lustre_handle lockh = { 0 };
2051         struct ptlrpc_request *req = NULL;
2052         int intent = *flags & LDLM_FL_HAS_INTENT;
2053         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2054         ldlm_mode_t mode;
2055         int rc;
2056         ENTRY;
2057
2058         /* Filesystem lock extents are extended to page boundaries so that
2059          * dealing with the page cache is a little smoother.  */
2060         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2061         policy->l_extent.end |= ~PAGE_MASK;
2062
2063         /*
2064          * kms is not valid when either object is completely fresh (so that no
2065          * locks are cached), or object was evicted. In the latter case cached
2066          * lock cannot be used, because it would prime inode state with
2067          * potentially stale LVB.
2068          */
2069         if (!kms_valid)
2070                 goto no_match;
2071
2072         /* Next, search for already existing extent locks that will cover us */
2073         /* If we're trying to read, we also search for an existing PW lock.  The
2074          * VFS and page cache already protect us locally, so lots of readers/
2075          * writers can share a single PW lock.
2076          *
2077          * There are problems with conversion deadlocks, so instead of
2078          * converting a read lock to a write lock, we'll just enqueue a new
2079          * one.
2080          *
2081          * At some point we should cancel the read lock instead of making them
2082          * send us a blocking callback, but there are problems with canceling
2083          * locks out from other users right now, too. */
2084         mode = einfo->ei_mode;
2085         if (einfo->ei_mode == LCK_PR)
2086                 mode |= LCK_PW;
2087         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2088                                einfo->ei_type, policy, mode, &lockh, 0);
2089         if (mode) {
2090                 struct ldlm_lock *matched;
2091
2092                 if (*flags & LDLM_FL_TEST_LOCK)
2093                         RETURN(ELDLM_OK);
2094
2095                 matched = ldlm_handle2lock(&lockh);
2096                 if (agl) {
2097                         /* AGL enqueues DLM locks speculatively. Therefore if
2098                          * it already exists a DLM lock, it wll just inform the
2099                          * caller to cancel the AGL process for this stripe. */
2100                         ldlm_lock_decref(&lockh, mode);
2101                         LDLM_LOCK_PUT(matched);
2102                         RETURN(-ECANCELED);
2103                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2104                         *flags |= LDLM_FL_LVB_READY;
2105
2106                         /* We already have a lock, and it's referenced. */
2107                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2108
2109                         ldlm_lock_decref(&lockh, mode);
2110                         LDLM_LOCK_PUT(matched);
2111                         RETURN(ELDLM_OK);
2112                 } else {
2113                         ldlm_lock_decref(&lockh, mode);
2114                         LDLM_LOCK_PUT(matched);
2115                 }
2116         }
2117
2118 no_match:
2119         if (*flags & LDLM_FL_TEST_LOCK)
2120                 RETURN(-ENOLCK);
2121
2122         if (intent) {
2123                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2124                                            &RQF_LDLM_ENQUEUE_LVB);
2125                 if (req == NULL)
2126                         RETURN(-ENOMEM);
2127
2128                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2129                 if (rc < 0) {
2130                         ptlrpc_request_free(req);
2131                         RETURN(rc);
2132                 }
2133
2134                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2135                                      sizeof *lvb);
2136                 ptlrpc_request_set_replen(req);
2137         }
2138
2139         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2140         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2141
2142         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2143                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2144         if (async) {
2145                 if (!rc) {
2146                         struct osc_enqueue_args *aa;
2147                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2148                         aa = ptlrpc_req_async_args(req);
2149                         aa->oa_exp    = exp;
2150                         aa->oa_mode   = einfo->ei_mode;
2151                         aa->oa_type   = einfo->ei_type;
2152                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2153                         aa->oa_upcall = upcall;
2154                         aa->oa_cookie = cookie;
2155                         aa->oa_agl    = !!agl;
2156                         if (!agl) {
2157                                 aa->oa_flags  = flags;
2158                                 aa->oa_lvb    = lvb;
2159                         } else {
2160                                 /* AGL is essentially to enqueue an DLM lock
2161                                  * in advance, so we don't care about the
2162                                  * result of AGL enqueue. */
2163                                 aa->oa_lvb    = NULL;
2164                                 aa->oa_flags  = NULL;
2165                         }
2166
2167                         req->rq_interpret_reply =
2168                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2169                         if (rqset == PTLRPCD_SET)
2170                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2171                         else
2172                                 ptlrpc_set_add_req(rqset, req);
2173                 } else if (intent) {
2174                         ptlrpc_req_finished(req);
2175                 }
2176                 RETURN(rc);
2177         }
2178
2179         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2180                               flags, agl, rc);
2181         if (intent)
2182                 ptlrpc_req_finished(req);
2183
2184         RETURN(rc);
2185 }
2186
2187 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2188                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2189                    __u64 *flags, void *data, struct lustre_handle *lockh,
2190                    int unref)
2191 {
2192         struct obd_device *obd = exp->exp_obd;
2193         __u64 lflags = *flags;
2194         ldlm_mode_t rc;
2195         ENTRY;
2196
2197         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2198                 RETURN(-EIO);
2199
2200         /* Filesystem lock extents are extended to page boundaries so that
2201          * dealing with the page cache is a little smoother */
2202         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2203         policy->l_extent.end |= ~PAGE_MASK;
2204
2205         /* Next, search for already existing extent locks that will cover us */
2206         /* If we're trying to read, we also search for an existing PW lock.  The
2207          * VFS and page cache already protect us locally, so lots of readers/
2208          * writers can share a single PW lock. */
2209         rc = mode;
2210         if (mode == LCK_PR)
2211                 rc |= LCK_PW;
2212         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2213                              res_id, type, policy, rc, lockh, unref);
2214         if (rc) {
2215                 if (data != NULL) {
2216                         if (!osc_set_data_with_check(lockh, data)) {
2217                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2218                                         ldlm_lock_decref(lockh, rc);
2219                                 RETURN(0);
2220                         }
2221                 }
2222                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2223                         ldlm_lock_addref(lockh, LCK_PR);
2224                         ldlm_lock_decref(lockh, LCK_PW);
2225                 }
2226                 RETURN(rc);
2227         }
2228         RETURN(rc);
2229 }
2230
2231 static int osc_statfs_interpret(const struct lu_env *env,
2232                                 struct ptlrpc_request *req,
2233                                 struct osc_async_args *aa, int rc)
2234 {
2235         struct obd_statfs *msfs;
2236         ENTRY;
2237
2238         if (rc == -EBADR)
2239                 /* The request has in fact never been sent
2240                  * due to issues at a higher level (LOV).
2241                  * Exit immediately since the caller is
2242                  * aware of the problem and takes care
2243                  * of the clean up */
2244                  RETURN(rc);
2245
2246         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2247             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2248                 GOTO(out, rc = 0);
2249
2250         if (rc != 0)
2251                 GOTO(out, rc);
2252
2253         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2254         if (msfs == NULL) {
2255                 GOTO(out, rc = -EPROTO);
2256         }
2257
2258         *aa->aa_oi->oi_osfs = *msfs;
2259 out:
2260         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2261         RETURN(rc);
2262 }
2263
2264 static int osc_statfs_async(struct obd_export *exp,
2265                             struct obd_info *oinfo, __u64 max_age,
2266                             struct ptlrpc_request_set *rqset)
2267 {
2268         struct obd_device     *obd = class_exp2obd(exp);
2269         struct ptlrpc_request *req;
2270         struct osc_async_args *aa;
2271         int                    rc;
2272         ENTRY;
2273
2274         /* We could possibly pass max_age in the request (as an absolute
2275          * timestamp or a "seconds.usec ago") so the target can avoid doing
2276          * extra calls into the filesystem if that isn't necessary (e.g.
2277          * during mount that would help a bit).  Having relative timestamps
2278          * is not so great if request processing is slow, while absolute
2279          * timestamps are not ideal because they need time synchronization. */
2280         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2281         if (req == NULL)
2282                 RETURN(-ENOMEM);
2283
2284         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2285         if (rc) {
2286                 ptlrpc_request_free(req);
2287                 RETURN(rc);
2288         }
2289         ptlrpc_request_set_replen(req);
2290         req->rq_request_portal = OST_CREATE_PORTAL;
2291         ptlrpc_at_set_req_timeout(req);
2292
2293         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2294                 /* procfs requests not want stat in wait for avoid deadlock */
2295                 req->rq_no_resend = 1;
2296                 req->rq_no_delay = 1;
2297         }
2298
2299         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2300         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2301         aa = ptlrpc_req_async_args(req);
2302         aa->aa_oi = oinfo;
2303
2304         ptlrpc_set_add_req(rqset, req);
2305         RETURN(0);
2306 }
2307
2308 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2309                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2310 {
2311         struct obd_device     *obd = class_exp2obd(exp);
2312         struct obd_statfs     *msfs;
2313         struct ptlrpc_request *req;
2314         struct obd_import     *imp = NULL;
2315         int rc;
2316         ENTRY;
2317
2318         /*Since the request might also come from lprocfs, so we need
2319          *sync this with client_disconnect_export Bug15684*/
2320         down_read(&obd->u.cli.cl_sem);
2321         if (obd->u.cli.cl_import)
2322                 imp = class_import_get(obd->u.cli.cl_import);
2323         up_read(&obd->u.cli.cl_sem);
2324         if (!imp)
2325                 RETURN(-ENODEV);
2326
2327         /* We could possibly pass max_age in the request (as an absolute
2328          * timestamp or a "seconds.usec ago") so the target can avoid doing
2329          * extra calls into the filesystem if that isn't necessary (e.g.
2330          * during mount that would help a bit).  Having relative timestamps
2331          * is not so great if request processing is slow, while absolute
2332          * timestamps are not ideal because they need time synchronization. */
2333         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2334
2335         class_import_put(imp);
2336
2337         if (req == NULL)
2338                 RETURN(-ENOMEM);
2339
2340         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2341         if (rc) {
2342                 ptlrpc_request_free(req);
2343                 RETURN(rc);
2344         }
2345         ptlrpc_request_set_replen(req);
2346         req->rq_request_portal = OST_CREATE_PORTAL;
2347         ptlrpc_at_set_req_timeout(req);
2348
2349         if (flags & OBD_STATFS_NODELAY) {
2350                 /* procfs requests not want stat in wait for avoid deadlock */
2351                 req->rq_no_resend = 1;
2352                 req->rq_no_delay = 1;
2353         }
2354
2355         rc = ptlrpc_queue_wait(req);
2356         if (rc)
2357                 GOTO(out, rc);
2358
2359         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2360         if (msfs == NULL) {
2361                 GOTO(out, rc = -EPROTO);
2362         }
2363
2364         *osfs = *msfs;
2365
2366         EXIT;
2367  out:
2368         ptlrpc_req_finished(req);
2369         return rc;
2370 }
2371
2372 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2373                          void *karg, void *uarg)
2374 {
2375         struct obd_device *obd = exp->exp_obd;
2376         struct obd_ioctl_data *data = karg;
2377         int err = 0;
2378         ENTRY;
2379
2380         if (!try_module_get(THIS_MODULE)) {
2381                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2382                        module_name(THIS_MODULE));
2383                 return -EINVAL;
2384         }
2385         switch (cmd) {
2386         case OBD_IOC_CLIENT_RECOVER:
2387                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2388                                             data->ioc_inlbuf1, 0);
2389                 if (err > 0)
2390                         err = 0;
2391                 GOTO(out, err);
2392         case IOC_OSC_SET_ACTIVE:
2393                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2394                                                data->ioc_offset);
2395                 GOTO(out, err);
2396         case OBD_IOC_PING_TARGET:
2397                 err = ptlrpc_obd_ping(obd);
2398                 GOTO(out, err);
2399         default:
2400                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2401                        cmd, current_comm());
2402                 GOTO(out, err = -ENOTTY);
2403         }
2404 out:
2405         module_put(THIS_MODULE);
2406         return err;
2407 }
2408
2409 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2410                               u32 keylen, void *key,
2411                               u32 vallen, void *val,
2412                               struct ptlrpc_request_set *set)
2413 {
2414         struct ptlrpc_request *req;
2415         struct obd_device     *obd = exp->exp_obd;
2416         struct obd_import     *imp = class_exp2cliimp(exp);
2417         char                  *tmp;
2418         int                    rc;
2419         ENTRY;
2420
2421         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2422
2423         if (KEY_IS(KEY_CHECKSUM)) {
2424                 if (vallen != sizeof(int))
2425                         RETURN(-EINVAL);
2426                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2427                 RETURN(0);
2428         }
2429
2430         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2431                 sptlrpc_conf_client_adapt(obd);
2432                 RETURN(0);
2433         }
2434
2435         if (KEY_IS(KEY_FLUSH_CTX)) {
2436                 sptlrpc_import_flush_my_ctx(imp);
2437                 RETURN(0);
2438         }
2439
2440         if (KEY_IS(KEY_CACHE_SET)) {
2441                 struct client_obd *cli = &obd->u.cli;
2442
2443                 LASSERT(cli->cl_cache == NULL); /* only once */
2444                 cli->cl_cache = (struct cl_client_cache *)val;
2445                 cl_cache_incref(cli->cl_cache);
2446                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2447
2448                 /* add this osc into entity list */
2449                 LASSERT(list_empty(&cli->cl_lru_osc));
2450                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2451                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2452                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2453
2454                 RETURN(0);
2455         }
2456
2457         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2458                 struct client_obd *cli = &obd->u.cli;
2459                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2460                 long target = *(long *)val;
2461
2462                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2463                 *(long *)val -= nr;
2464                 RETURN(0);
2465         }
2466
2467         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2468                 RETURN(-EINVAL);
2469
2470         /* We pass all other commands directly to OST. Since nobody calls osc
2471            methods directly and everybody is supposed to go through LOV, we
2472            assume lov checked invalid values for us.
2473            The only recognised values so far are evict_by_nid and mds_conn.
2474            Even if something bad goes through, we'd get a -EINVAL from OST
2475            anyway. */
2476
2477         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2478                                                 &RQF_OST_SET_GRANT_INFO :
2479                                                 &RQF_OBD_SET_INFO);
2480         if (req == NULL)
2481                 RETURN(-ENOMEM);
2482
2483         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2484                              RCL_CLIENT, keylen);
2485         if (!KEY_IS(KEY_GRANT_SHRINK))
2486                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2487                                      RCL_CLIENT, vallen);
2488         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2489         if (rc) {
2490                 ptlrpc_request_free(req);
2491                 RETURN(rc);
2492         }
2493
2494         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2495         memcpy(tmp, key, keylen);
2496         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2497                                                         &RMF_OST_BODY :
2498                                                         &RMF_SETINFO_VAL);
2499         memcpy(tmp, val, vallen);
2500
2501         if (KEY_IS(KEY_GRANT_SHRINK)) {
2502                 struct osc_grant_args *aa;
2503                 struct obdo *oa;
2504
2505                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2506                 aa = ptlrpc_req_async_args(req);
2507                 OBDO_ALLOC(oa);
2508                 if (!oa) {
2509                         ptlrpc_req_finished(req);
2510                         RETURN(-ENOMEM);
2511                 }
2512                 *oa = ((struct ost_body *)val)->oa;
2513                 aa->aa_oa = oa;
2514                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2515         }
2516
2517         ptlrpc_request_set_replen(req);
2518         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2519                 LASSERT(set != NULL);
2520                 ptlrpc_set_add_req(set, req);
2521                 ptlrpc_check_set(NULL, set);
2522         } else
2523                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2524
2525         RETURN(0);
2526 }
2527
2528 static int osc_reconnect(const struct lu_env *env,
2529                          struct obd_export *exp, struct obd_device *obd,
2530                          struct obd_uuid *cluuid,
2531                          struct obd_connect_data *data,
2532                          void *localdata)
2533 {
2534         struct client_obd *cli = &obd->u.cli;
2535
2536         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2537                 long lost_grant;
2538
2539                 spin_lock(&cli->cl_loi_list_lock);
2540                 data->ocd_grant = (cli->cl_avail_grant +
2541                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2542                                   2 * cli_brw_size(obd);
2543                 lost_grant = cli->cl_lost_grant;
2544                 cli->cl_lost_grant = 0;
2545                 spin_unlock(&cli->cl_loi_list_lock);
2546
2547                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2548                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2549                        data->ocd_version, data->ocd_grant, lost_grant);
2550         }
2551
2552         RETURN(0);
2553 }
2554
2555 static int osc_disconnect(struct obd_export *exp)
2556 {
2557         struct obd_device *obd = class_exp2obd(exp);
2558         int rc;
2559
2560         rc = client_disconnect_export(exp);
2561         /**
2562          * Initially we put del_shrink_grant before disconnect_export, but it
2563          * causes the following problem if setup (connect) and cleanup
2564          * (disconnect) are tangled together.
2565          *      connect p1                     disconnect p2
2566          *   ptlrpc_connect_import
2567          *     ...............               class_manual_cleanup
2568          *                                     osc_disconnect
2569          *                                     del_shrink_grant
2570          *   ptlrpc_connect_interrupt
2571          *     init_grant_shrink
2572          *   add this client to shrink list
2573          *                                      cleanup_osc
2574          * Bang! pinger trigger the shrink.
2575          * So the osc should be disconnected from the shrink list, after we
2576          * are sure the import has been destroyed. BUG18662
2577          */
2578         if (obd->u.cli.cl_import == NULL)
2579                 osc_del_shrink_grant(&obd->u.cli);
2580         return rc;
2581 }
2582
2583 static int osc_import_event(struct obd_device *obd,
2584                             struct obd_import *imp,
2585                             enum obd_import_event event)
2586 {
2587         struct client_obd *cli;
2588         int rc = 0;
2589
2590         ENTRY;
2591         LASSERT(imp->imp_obd == obd);
2592
2593         switch (event) {
2594         case IMP_EVENT_DISCON: {
2595                 cli = &obd->u.cli;
2596                 spin_lock(&cli->cl_loi_list_lock);
2597                 cli->cl_avail_grant = 0;
2598                 cli->cl_lost_grant = 0;
2599                 spin_unlock(&cli->cl_loi_list_lock);
2600                 break;
2601         }
2602         case IMP_EVENT_INACTIVE: {
2603                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2604                 break;
2605         }
2606         case IMP_EVENT_INVALIDATE: {
2607                 struct ldlm_namespace *ns = obd->obd_namespace;
2608                 struct lu_env         *env;
2609                 int                    refcheck;
2610
2611                 env = cl_env_get(&refcheck);
2612                 if (!IS_ERR(env)) {
2613                         /* Reset grants */
2614                         cli = &obd->u.cli;
2615                         /* all pages go to failing rpcs due to the invalid
2616                          * import */
2617                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2618
2619                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2620                         cl_env_put(env, &refcheck);
2621                 } else
2622                         rc = PTR_ERR(env);
2623                 break;
2624         }
2625         case IMP_EVENT_ACTIVE: {
2626                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2627                 break;
2628         }
2629         case IMP_EVENT_OCD: {
2630                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2631
2632                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2633                         osc_init_grant(&obd->u.cli, ocd);
2634
2635                 /* See bug 7198 */
2636                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2637                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2638
2639                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2640                 break;
2641         }
2642         case IMP_EVENT_DEACTIVATE: {
2643                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2644                 break;
2645         }
2646         case IMP_EVENT_ACTIVATE: {
2647                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2648                 break;
2649         }
2650         default:
2651                 CERROR("Unknown import event %d\n", event);
2652                 LBUG();
2653         }
2654         RETURN(rc);
2655 }
2656
2657 /**
2658  * Determine whether the lock can be canceled before replaying the lock
2659  * during recovery, see bug16774 for detailed information.
2660  *
2661  * \retval zero the lock can't be canceled
2662  * \retval other ok to cancel
2663  */
2664 static int osc_cancel_weight(struct ldlm_lock *lock)
2665 {
2666         /*
2667          * Cancel all unused and granted extent lock.
2668          */
2669         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2670             lock->l_granted_mode == lock->l_req_mode &&
2671             osc_ldlm_weigh_ast(lock) == 0)
2672                 RETURN(1);
2673
2674         RETURN(0);
2675 }
2676
2677 static int brw_queue_work(const struct lu_env *env, void *data)
2678 {
2679         struct client_obd *cli = data;
2680
2681         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2682
2683         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2684         RETURN(0);
2685 }
2686
2687 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2688 {
2689         struct client_obd *cli = &obd->u.cli;
2690         struct obd_type   *type;
2691         void              *handler;
2692         int                rc;
2693         ENTRY;
2694
2695         rc = ptlrpcd_addref();
2696         if (rc)
2697                 RETURN(rc);
2698
2699         rc = client_obd_setup(obd, lcfg);
2700         if (rc)
2701                 GOTO(out_ptlrpcd, rc);
2702
2703         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2704         if (IS_ERR(handler))
2705                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2706         cli->cl_writeback_work = handler;
2707
2708         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2709         if (IS_ERR(handler))
2710                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2711         cli->cl_lru_work = handler;
2712
2713         rc = osc_quota_setup(obd);
2714         if (rc)
2715                 GOTO(out_ptlrpcd_work, rc);
2716
2717         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2718
2719 #ifdef CONFIG_PROC_FS
2720         obd->obd_vars = lprocfs_osc_obd_vars;
2721 #endif
2722         /* If this is true then both client (osc) and server (osp) are on the
2723          * same node. The osp layer if loaded first will register the osc proc
2724          * directory. In that case this obd_device will be attached its proc
2725          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2726         type = class_search_type(LUSTRE_OSP_NAME);
2727         if (type && type->typ_procsym) {
2728                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2729                                                        type->typ_procsym,
2730                                                        obd->obd_vars, obd);
2731                 if (IS_ERR(obd->obd_proc_entry)) {
2732                         rc = PTR_ERR(obd->obd_proc_entry);
2733                         CERROR("error %d setting up lprocfs for %s\n", rc,
2734                                obd->obd_name);
2735                         obd->obd_proc_entry = NULL;
2736                 }
2737         } else {
2738                 rc = lprocfs_obd_setup(obd);
2739         }
2740
2741         /* If the basic OSC proc tree construction succeeded then
2742          * lets do the rest. */
2743         if (rc == 0) {
2744                 lproc_osc_attach_seqstat(obd);
2745                 sptlrpc_lprocfs_cliobd_attach(obd);
2746                 ptlrpc_lprocfs_register_obd(obd);
2747         }
2748
2749         /* We need to allocate a few requests more, because
2750          * brw_interpret tries to create new requests before freeing
2751          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2752          * reserved, but I'm afraid that might be too much wasted RAM
2753          * in fact, so 2 is just my guess and still should work. */
2754         cli->cl_import->imp_rq_pool =
2755                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2756                                     OST_MAXREQSIZE,
2757                                     ptlrpc_add_rqs_to_pool);
2758
2759         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2760         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2761         RETURN(0);
2762
2763 out_ptlrpcd_work:
2764         if (cli->cl_writeback_work != NULL) {
2765                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2766                 cli->cl_writeback_work = NULL;
2767         }
2768         if (cli->cl_lru_work != NULL) {
2769                 ptlrpcd_destroy_work(cli->cl_lru_work);
2770                 cli->cl_lru_work = NULL;
2771         }
2772 out_client_setup:
2773         client_obd_cleanup(obd);
2774 out_ptlrpcd:
2775         ptlrpcd_decref();
2776         RETURN(rc);
2777 }
2778
2779 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2780 {
2781         int rc = 0;
2782         ENTRY;
2783
2784         switch (stage) {
2785         case OBD_CLEANUP_EARLY: {
2786                 struct obd_import *imp;
2787                 imp = obd->u.cli.cl_import;
2788                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2789                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2790                 ptlrpc_deactivate_import(imp);
2791                 spin_lock(&imp->imp_lock);
2792                 imp->imp_pingable = 0;
2793                 spin_unlock(&imp->imp_lock);
2794                 break;
2795         }
2796         case OBD_CLEANUP_EXPORTS: {
2797                 struct client_obd *cli = &obd->u.cli;
2798                 /* LU-464
2799                  * for echo client, export may be on zombie list, wait for
2800                  * zombie thread to cull it, because cli.cl_import will be
2801                  * cleared in client_disconnect_export():
2802                  *   class_export_destroy() -> obd_cleanup() ->
2803                  *   echo_device_free() -> echo_client_cleanup() ->
2804                  *   obd_disconnect() -> osc_disconnect() ->
2805                  *   client_disconnect_export()
2806                  */
2807                 obd_zombie_barrier();
2808                 if (cli->cl_writeback_work) {
2809                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2810                         cli->cl_writeback_work = NULL;
2811                 }
2812                 if (cli->cl_lru_work) {
2813                         ptlrpcd_destroy_work(cli->cl_lru_work);
2814                         cli->cl_lru_work = NULL;
2815                 }
2816                 obd_cleanup_client_import(obd);
2817                 ptlrpc_lprocfs_unregister_obd(obd);
2818                 lprocfs_obd_cleanup(obd);
2819                 break;
2820                 }
2821         }
2822         RETURN(rc);
2823 }
2824
2825 int osc_cleanup(struct obd_device *obd)
2826 {
2827         struct client_obd *cli = &obd->u.cli;
2828         int rc;
2829
2830         ENTRY;
2831
2832         /* lru cleanup */
2833         if (cli->cl_cache != NULL) {
2834                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2835                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2836                 list_del_init(&cli->cl_lru_osc);
2837                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2838                 cli->cl_lru_left = NULL;
2839                 cl_cache_decref(cli->cl_cache);
2840                 cli->cl_cache = NULL;
2841         }
2842
2843         /* free memory of osc quota cache */
2844         osc_quota_cleanup(obd);
2845
2846         rc = client_obd_cleanup(obd);
2847
2848         ptlrpcd_decref();
2849         RETURN(rc);
2850 }
2851
2852 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2853 {
2854         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2855         return rc > 0 ? 0: rc;
2856 }
2857
2858 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2859 {
2860         return osc_process_config_base(obd, buf);
2861 }
2862
2863 static struct obd_ops osc_obd_ops = {
2864         .o_owner                = THIS_MODULE,
2865         .o_setup                = osc_setup,
2866         .o_precleanup           = osc_precleanup,
2867         .o_cleanup              = osc_cleanup,
2868         .o_add_conn             = client_import_add_conn,
2869         .o_del_conn             = client_import_del_conn,
2870         .o_connect              = client_connect_import,
2871         .o_reconnect            = osc_reconnect,
2872         .o_disconnect           = osc_disconnect,
2873         .o_statfs               = osc_statfs,
2874         .o_statfs_async         = osc_statfs_async,
2875         .o_create               = osc_create,
2876         .o_destroy              = osc_destroy,
2877         .o_getattr              = osc_getattr,
2878         .o_setattr              = osc_setattr,
2879         .o_iocontrol            = osc_iocontrol,
2880         .o_set_info_async       = osc_set_info_async,
2881         .o_import_event         = osc_import_event,
2882         .o_process_config       = osc_process_config,
2883         .o_quotactl             = osc_quotactl,
2884 };
2885
2886 static int __init osc_init(void)
2887 {
2888         bool enable_proc = true;
2889         struct obd_type *type;
2890         int rc;
2891         ENTRY;
2892
2893         /* print an address of _any_ initialized kernel symbol from this
2894          * module, to allow debugging with gdb that doesn't support data
2895          * symbols from modules.*/
2896         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2897
2898         rc = lu_kmem_init(osc_caches);
2899         if (rc)
2900                 RETURN(rc);
2901
2902         type = class_search_type(LUSTRE_OSP_NAME);
2903         if (type != NULL && type->typ_procsym != NULL)
2904                 enable_proc = false;
2905
2906         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2907                                  LUSTRE_OSC_NAME, &osc_device_type);
2908         if (rc) {
2909                 lu_kmem_fini(osc_caches);
2910                 RETURN(rc);
2911         }
2912
2913         RETURN(rc);
2914 }
2915
2916 static void /*__exit*/ osc_exit(void)
2917 {
2918         class_unregister_type(LUSTRE_OSC_NAME);
2919         lu_kmem_fini(osc_caches);
2920 }
2921
2922 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2923 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2924 MODULE_VERSION(LUSTRE_VERSION_STRING);
2925 MODULE_LICENSE("GPL");
2926
2927 module_init(osc_init);
2928 module_exit(osc_exit);