Whamcloud - gitweb
LU-5814 obd: remove unused LSM parameters
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         u32                       aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_setattr_args {
72         struct obdo             *sa_oa;
73         obd_enqueue_update_f     sa_upcall;
74         void                    *sa_cookie;
75 };
76
77 struct osc_fsync_args {
78         struct obd_info *fa_oi;
79         obd_enqueue_update_f     fa_upcall;
80         void                    *fa_cookie;
81 };
82
83 struct osc_enqueue_args {
84         struct obd_export       *oa_exp;
85         ldlm_type_t             oa_type;
86         ldlm_mode_t             oa_mode;
87         __u64                   *oa_flags;
88         osc_enqueue_upcall_f    oa_upcall;
89         void                    *oa_cookie;
90         struct ost_lvb          *oa_lvb;
91         struct lustre_handle    oa_lockh;
92         unsigned int            oa_agl:1;
93 };
94
95 static void osc_release_ppga(struct brw_page **ppga, size_t count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
97                          void *data, int rc);
98
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100                                  struct ost_body *body, void *capa)
101 {
102         struct obd_capa *oc = (struct obd_capa *)capa;
103         struct lustre_capa *c;
104
105         if (!capa)
106                 return;
107
108         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
109         LASSERT(c);
110         capa_cpy(c, oc);
111         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112         DEBUG_CAPA(D_SEC, c, "pack");
113 }
114
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
116 {
117         struct ost_body *body;
118
119         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
120         LASSERT(body);
121
122         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
123                              oinfo->oi_oa);
124         osc_pack_capa(req, body, oinfo->oi_capa);
125 }
126
127 void osc_set_capa_size(struct ptlrpc_request *req,
128                        const struct req_msg_field *field,
129                        struct obd_capa *oc)
130 {
131         if (oc == NULL)
132                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
133         else
134                 /* it is already calculated as sizeof struct obd_capa */
135                 ;
136 }
137
138 int osc_getattr_interpret(const struct lu_env *env,
139                           struct ptlrpc_request *req,
140                           struct osc_async_args *aa, int rc)
141 {
142         struct ost_body *body;
143         ENTRY;
144
145         if (rc != 0)
146                 GOTO(out, rc);
147
148         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
149         if (body) {
150                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
152                                      aa->aa_oi->oi_oa, &body->oa);
153
154                 /* This should really be sent by the OST */
155                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
156                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
157         } else {
158                 CDEBUG(D_INFO, "can't unpack ost_body\n");
159                 rc = -EPROTO;
160                 aa->aa_oi->oi_oa->o_valid = 0;
161         }
162 out:
163         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
164         RETURN(rc);
165 }
166
167 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
168                        struct obd_info *oinfo)
169 {
170         struct ptlrpc_request *req;
171         struct ost_body       *body;
172         int                    rc;
173         ENTRY;
174
175         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
176         if (req == NULL)
177                 RETURN(-ENOMEM);
178
179         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
180         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
181         if (rc) {
182                 ptlrpc_request_free(req);
183                 RETURN(rc);
184         }
185
186         osc_pack_req_body(req, oinfo);
187
188         ptlrpc_request_set_replen(req);
189
190         rc = ptlrpc_queue_wait(req);
191         if (rc)
192                 GOTO(out, rc);
193
194         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195         if (body == NULL)
196                 GOTO(out, rc = -EPROTO);
197
198         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
199         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
200                              &body->oa);
201
202         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
203         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
204
205         EXIT;
206  out:
207         ptlrpc_req_finished(req);
208         return rc;
209 }
210
211 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
212                        struct obd_info *oinfo, struct obd_trans_info *oti)
213 {
214         struct ptlrpc_request *req;
215         struct ost_body       *body;
216         int                    rc;
217         ENTRY;
218
219         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
220
221         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
222         if (req == NULL)
223                 RETURN(-ENOMEM);
224
225         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
226         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
227         if (rc) {
228                 ptlrpc_request_free(req);
229                 RETURN(rc);
230         }
231
232         osc_pack_req_body(req, oinfo);
233
234         ptlrpc_request_set_replen(req);
235
236         rc = ptlrpc_queue_wait(req);
237         if (rc)
238                 GOTO(out, rc);
239
240         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
241         if (body == NULL)
242                 GOTO(out, rc = -EPROTO);
243
244         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
245                              &body->oa);
246
247         EXIT;
248 out:
249         ptlrpc_req_finished(req);
250         RETURN(rc);
251 }
252
253 static int osc_setattr_interpret(const struct lu_env *env,
254                                  struct ptlrpc_request *req,
255                                  struct osc_setattr_args *sa, int rc)
256 {
257         struct ost_body *body;
258         ENTRY;
259
260         if (rc != 0)
261                 GOTO(out, rc);
262
263         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
264         if (body == NULL)
265                 GOTO(out, rc = -EPROTO);
266
267         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
268                              &body->oa);
269 out:
270         rc = sa->sa_upcall(sa->sa_cookie, rc);
271         RETURN(rc);
272 }
273
274 int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
275                       obd_enqueue_update_f upcall, void *cookie,
276                       struct ptlrpc_request_set *rqset)
277 {
278         struct ptlrpc_request   *req;
279         struct osc_setattr_args *sa;
280         int                      rc;
281         ENTRY;
282
283         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
284         if (req == NULL)
285                 RETURN(-ENOMEM);
286
287         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
288         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
289         if (rc) {
290                 ptlrpc_request_free(req);
291                 RETURN(rc);
292         }
293
294         osc_pack_req_body(req, oinfo);
295
296         ptlrpc_request_set_replen(req);
297
298         /* do mds to ost setattr asynchronously */
299         if (!rqset) {
300                 /* Do not wait for response. */
301                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
302         } else {
303                 req->rq_interpret_reply =
304                         (ptlrpc_interpterer_t)osc_setattr_interpret;
305
306                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
307                 sa = ptlrpc_req_async_args(req);
308                 sa->sa_oa = oinfo->oi_oa;
309                 sa->sa_upcall = upcall;
310                 sa->sa_cookie = cookie;
311
312                 if (rqset == PTLRPCD_SET)
313                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
314                 else
315                         ptlrpc_set_add_req(rqset, req);
316         }
317
318         RETURN(0);
319 }
320
321 static int osc_create(const struct lu_env *env, struct obd_export *exp,
322                       struct obdo *oa, struct obd_trans_info *oti)
323 {
324         struct ptlrpc_request *req;
325         struct ost_body       *body;
326         int                    rc;
327         ENTRY;
328
329         LASSERT(oa != NULL);
330         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
331         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
332
333         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
334         if (req == NULL)
335                 GOTO(out, rc = -ENOMEM);
336
337         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
338         if (rc) {
339                 ptlrpc_request_free(req);
340                 GOTO(out, rc);
341         }
342
343         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
344         LASSERT(body);
345
346         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
347
348         ptlrpc_request_set_replen(req);
349
350         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
351             oa->o_flags == OBD_FL_DELORPHAN) {
352                 DEBUG_REQ(D_HA, req,
353                           "delorphan from OST integration");
354                 /* Don't resend the delorphan req */
355                 req->rq_no_resend = req->rq_no_delay = 1;
356         }
357
358         rc = ptlrpc_queue_wait(req);
359         if (rc)
360                 GOTO(out_req, rc);
361
362         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 GOTO(out_req, rc = -EPROTO);
365
366         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
367         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
368
369         oa->o_blksize = cli_brw_size(exp->exp_obd);
370         oa->o_valid |= OBD_MD_FLBLKSZ;
371
372         if (oti != NULL) {
373                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
374                         if (oti->oti_logcookies == NULL)
375                                 oti->oti_logcookies = &oti->oti_onecookie;
376
377                         *oti->oti_logcookies = oa->o_lcookie;
378                 }
379         }
380
381         CDEBUG(D_HA, "transno: "LPD64"\n",
382                lustre_msg_get_transno(req->rq_repmsg));
383 out_req:
384         ptlrpc_req_finished(req);
385 out:
386         RETURN(rc);
387 }
388
389 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
390                    obd_enqueue_update_f upcall, void *cookie,
391                    struct ptlrpc_request_set *rqset)
392 {
393         struct ptlrpc_request   *req;
394         struct osc_setattr_args *sa;
395         struct ost_body         *body;
396         int                      rc;
397         ENTRY;
398
399         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
400         if (req == NULL)
401                 RETURN(-ENOMEM);
402
403         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
404         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
405         if (rc) {
406                 ptlrpc_request_free(req);
407                 RETURN(rc);
408         }
409         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
410         ptlrpc_at_set_req_timeout(req);
411
412         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
413         LASSERT(body);
414         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
415                              oinfo->oi_oa);
416         osc_pack_capa(req, body, oinfo->oi_capa);
417
418         ptlrpc_request_set_replen(req);
419
420         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
421         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
422         sa = ptlrpc_req_async_args(req);
423         sa->sa_oa     = oinfo->oi_oa;
424         sa->sa_upcall = upcall;
425         sa->sa_cookie = cookie;
426         if (rqset == PTLRPCD_SET)
427                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
428         else
429                 ptlrpc_set_add_req(rqset, req);
430
431         RETURN(0);
432 }
433
434 static int osc_sync_interpret(const struct lu_env *env,
435                               struct ptlrpc_request *req,
436                               void *arg, int rc)
437 {
438         struct osc_fsync_args *fa = arg;
439         struct ost_body *body;
440         ENTRY;
441
442         if (rc)
443                 GOTO(out, rc);
444
445         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
446         if (body == NULL) {
447                 CERROR ("can't unpack ost_body\n");
448                 GOTO(out, rc = -EPROTO);
449         }
450
451         *fa->fa_oi->oi_oa = body->oa;
452 out:
453         rc = fa->fa_upcall(fa->fa_cookie, rc);
454         RETURN(rc);
455 }
456
457 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
458                   obd_enqueue_update_f upcall, void *cookie,
459                   struct ptlrpc_request_set *rqset)
460 {
461         struct ptlrpc_request *req;
462         struct ost_body       *body;
463         struct osc_fsync_args *fa;
464         int                    rc;
465         ENTRY;
466
467         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
468         if (req == NULL)
469                 RETURN(-ENOMEM);
470
471         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
472         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
473         if (rc) {
474                 ptlrpc_request_free(req);
475                 RETURN(rc);
476         }
477
478         /* overload the size and blocks fields in the oa with start/end */
479         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
480         LASSERT(body);
481         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
482                              oinfo->oi_oa);
483         osc_pack_capa(req, body, oinfo->oi_capa);
484
485         ptlrpc_request_set_replen(req);
486         req->rq_interpret_reply = osc_sync_interpret;
487
488         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
489         fa = ptlrpc_req_async_args(req);
490         fa->fa_oi = oinfo;
491         fa->fa_upcall = upcall;
492         fa->fa_cookie = cookie;
493
494         if (rqset == PTLRPCD_SET)
495                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
496         else
497                 ptlrpc_set_add_req(rqset, req);
498
499         RETURN (0);
500 }
501
502 /* Find and cancel locally locks matched by @mode in the resource found by
503  * @objid. Found locks are added into @cancel list. Returns the amount of
504  * locks added to @cancels list. */
505 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
506                                    struct list_head *cancels,
507                                    ldlm_mode_t mode, __u64 lock_flags)
508 {
509         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
510         struct ldlm_res_id res_id;
511         struct ldlm_resource *res;
512         int count;
513         ENTRY;
514
515         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
516          * export) but disabled through procfs (flag in NS).
517          *
518          * This distinguishes from a case when ELC is not supported originally,
519          * when we still want to cancel locks in advance and just cancel them
520          * locally, without sending any RPC. */
521         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
522                 RETURN(0);
523
524         ostid_build_res_name(&oa->o_oi, &res_id);
525         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
526         if (IS_ERR(res))
527                 RETURN(0);
528
529         LDLM_RESOURCE_ADDREF(res);
530         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
531                                            lock_flags, 0, NULL);
532         LDLM_RESOURCE_DELREF(res);
533         ldlm_resource_putref(res);
534         RETURN(count);
535 }
536
537 static int osc_destroy_interpret(const struct lu_env *env,
538                                  struct ptlrpc_request *req, void *data,
539                                  int rc)
540 {
541         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
542
543         atomic_dec(&cli->cl_destroy_in_flight);
544         wake_up(&cli->cl_destroy_waitq);
545         return 0;
546 }
547
548 static int osc_can_send_destroy(struct client_obd *cli)
549 {
550         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
551             cli->cl_max_rpcs_in_flight) {
552                 /* The destroy request can be sent */
553                 return 1;
554         }
555         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
556             cli->cl_max_rpcs_in_flight) {
557                 /*
558                  * The counter has been modified between the two atomic
559                  * operations.
560                  */
561                 wake_up(&cli->cl_destroy_waitq);
562         }
563         return 0;
564 }
565
566 /* Destroy requests can be async always on the client, and we don't even really
567  * care about the return code since the client cannot do anything at all about
568  * a destroy failure.
569  * When the MDS is unlinking a filename, it saves the file objects into a
570  * recovery llog, and these object records are cancelled when the OST reports
571  * they were destroyed and sync'd to disk (i.e. transaction committed).
572  * If the client dies, or the OST is down when the object should be destroyed,
573  * the records are not cancelled, and when the OST reconnects to the MDS next,
574  * it will retrieve the llog unlink logs and then sends the log cancellation
575  * cookies to the MDS after committing destroy transactions. */
576 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
577                        struct obdo *oa, struct obd_trans_info *oti)
578 {
579         struct client_obd     *cli = &exp->exp_obd->u.cli;
580         struct ptlrpc_request *req;
581         struct ost_body       *body;
582         struct list_head       cancels = LIST_HEAD_INIT(cancels);
583         int rc, count;
584         ENTRY;
585
586         if (!oa) {
587                 CDEBUG(D_INFO, "oa NULL\n");
588                 RETURN(-EINVAL);
589         }
590
591         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
592                                         LDLM_FL_DISCARD_DATA);
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
595         if (req == NULL) {
596                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
597                 RETURN(-ENOMEM);
598         }
599
600         osc_set_capa_size(req, &RMF_CAPA1, NULL);
601         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
602                                0, &cancels, count);
603         if (rc) {
604                 ptlrpc_request_free(req);
605                 RETURN(rc);
606         }
607
608         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
609         ptlrpc_at_set_req_timeout(req);
610
611         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
612                 oa->o_lcookie = *oti->oti_logcookies;
613         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614         LASSERT(body);
615         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
616
617         ptlrpc_request_set_replen(req);
618
619         /* If osc_destory is for destroying the unlink orphan,
620          * sent from MDT to OST, which should not be blocked here,
621          * because the process might be triggered by ptlrpcd, and
622          * it is not good to block ptlrpcd thread (b=16006)*/
623         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
624                 req->rq_interpret_reply = osc_destroy_interpret;
625                 if (!osc_can_send_destroy(cli)) {
626                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
627                                                           NULL);
628
629                         /*
630                          * Wait until the number of on-going destroy RPCs drops
631                          * under max_rpc_in_flight
632                          */
633                         l_wait_event_exclusive(cli->cl_destroy_waitq,
634                                                osc_can_send_destroy(cli), &lwi);
635                 }
636         }
637
638         /* Do not wait for response */
639         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
640         RETURN(0);
641 }
642
643 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
644                                 long writing_bytes)
645 {
646         u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
647
648         LASSERT(!(oa->o_valid & bits));
649
650         oa->o_valid |= bits;
651         spin_lock(&cli->cl_loi_list_lock);
652         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
653         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
654                      cli->cl_dirty_max_pages)) {
655                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
656                        cli->cl_dirty_pages, cli->cl_dirty_transit,
657                        cli->cl_dirty_max_pages);
658                 oa->o_undirty = 0;
659         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
660                             atomic_long_read(&obd_dirty_transit_pages) >
661                             (obd_max_dirty_pages + 1))) {
662                 /* The atomic_read() allowing the atomic_inc() are
663                  * not covered by a lock thus they may safely race and trip
664                  * this CERROR() unless we add in a small fudge factor (+1). */
665                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
666                        cli->cl_import->imp_obd->obd_name,
667                        atomic_long_read(&obd_dirty_pages),
668                        atomic_long_read(&obd_dirty_transit_pages),
669                        obd_max_dirty_pages);
670                 oa->o_undirty = 0;
671         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
672                             0x7fffffff)) {
673                 CERROR("dirty %lu - dirty_max %lu too big???\n",
674                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
675                 oa->o_undirty = 0;
676         } else {
677                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
678                                       PAGE_CACHE_SHIFT) *
679                                      (cli->cl_max_rpcs_in_flight + 1);
680                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
681                                     max_in_flight);
682         }
683         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
684         oa->o_dropped = cli->cl_lost_grant;
685         cli->cl_lost_grant = 0;
686         spin_unlock(&cli->cl_loi_list_lock);
687         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
688                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
689
690 }
691
692 void osc_update_next_shrink(struct client_obd *cli)
693 {
694         cli->cl_next_shrink_grant =
695                 cfs_time_shift(cli->cl_grant_shrink_interval);
696         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
697                cli->cl_next_shrink_grant);
698 }
699
700 static void __osc_update_grant(struct client_obd *cli, u64 grant)
701 {
702         spin_lock(&cli->cl_loi_list_lock);
703         cli->cl_avail_grant += grant;
704         spin_unlock(&cli->cl_loi_list_lock);
705 }
706
707 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
708 {
709         if (body->oa.o_valid & OBD_MD_FLGRANT) {
710                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
711                 __osc_update_grant(cli, body->oa.o_grant);
712         }
713 }
714
715 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
716                               u32 keylen, void *key,
717                               u32 vallen, void *val,
718                               struct ptlrpc_request_set *set);
719
720 static int osc_shrink_grant_interpret(const struct lu_env *env,
721                                       struct ptlrpc_request *req,
722                                       void *aa, int rc)
723 {
724         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
725         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
726         struct ost_body *body;
727
728         if (rc != 0) {
729                 __osc_update_grant(cli, oa->o_grant);
730                 GOTO(out, rc);
731         }
732
733         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
734         LASSERT(body);
735         osc_update_grant(cli, body);
736 out:
737         OBDO_FREE(oa);
738         return rc;
739 }
740
741 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
742 {
743         spin_lock(&cli->cl_loi_list_lock);
744         oa->o_grant = cli->cl_avail_grant / 4;
745         cli->cl_avail_grant -= oa->o_grant;
746         spin_unlock(&cli->cl_loi_list_lock);
747         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
748                 oa->o_valid |= OBD_MD_FLFLAGS;
749                 oa->o_flags = 0;
750         }
751         oa->o_flags |= OBD_FL_SHRINK_GRANT;
752         osc_update_next_shrink(cli);
753 }
754
755 /* Shrink the current grant, either from some large amount to enough for a
756  * full set of in-flight RPCs, or if we have already shrunk to that limit
757  * then to enough for a single RPC.  This avoids keeping more grant than
758  * needed, and avoids shrinking the grant piecemeal. */
759 static int osc_shrink_grant(struct client_obd *cli)
760 {
761         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
762                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
763
764         spin_lock(&cli->cl_loi_list_lock);
765         if (cli->cl_avail_grant <= target_bytes)
766                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
767         spin_unlock(&cli->cl_loi_list_lock);
768
769         return osc_shrink_grant_to_target(cli, target_bytes);
770 }
771
772 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
773 {
774         int                     rc = 0;
775         struct ost_body        *body;
776         ENTRY;
777
778         spin_lock(&cli->cl_loi_list_lock);
779         /* Don't shrink if we are already above or below the desired limit
780          * We don't want to shrink below a single RPC, as that will negatively
781          * impact block allocation and long-term performance. */
782         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
783                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
784
785         if (target_bytes >= cli->cl_avail_grant) {
786                 spin_unlock(&cli->cl_loi_list_lock);
787                 RETURN(0);
788         }
789         spin_unlock(&cli->cl_loi_list_lock);
790
791         OBD_ALLOC_PTR(body);
792         if (!body)
793                 RETURN(-ENOMEM);
794
795         osc_announce_cached(cli, &body->oa, 0);
796
797         spin_lock(&cli->cl_loi_list_lock);
798         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
799         cli->cl_avail_grant = target_bytes;
800         spin_unlock(&cli->cl_loi_list_lock);
801         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
802                 body->oa.o_valid |= OBD_MD_FLFLAGS;
803                 body->oa.o_flags = 0;
804         }
805         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
806         osc_update_next_shrink(cli);
807
808         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
809                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
810                                 sizeof(*body), body, NULL);
811         if (rc != 0)
812                 __osc_update_grant(cli, body->oa.o_grant);
813         OBD_FREE_PTR(body);
814         RETURN(rc);
815 }
816
817 static int osc_should_shrink_grant(struct client_obd *client)
818 {
819         cfs_time_t time = cfs_time_current();
820         cfs_time_t next_shrink = client->cl_next_shrink_grant;
821
822         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
823              OBD_CONNECT_GRANT_SHRINK) == 0)
824                 return 0;
825
826         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
827                 /* Get the current RPC size directly, instead of going via:
828                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
829                  * Keep comment here so that it can be found by searching. */
830                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
831
832                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
833                     client->cl_avail_grant > brw_size)
834                         return 1;
835                 else
836                         osc_update_next_shrink(client);
837         }
838         return 0;
839 }
840
841 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
842 {
843         struct client_obd *client;
844
845         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
846                 if (osc_should_shrink_grant(client))
847                         osc_shrink_grant(client);
848         }
849         return 0;
850 }
851
852 static int osc_add_shrink_grant(struct client_obd *client)
853 {
854         int rc;
855
856         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
857                                        TIMEOUT_GRANT,
858                                        osc_grant_shrink_grant_cb, NULL,
859                                        &client->cl_grant_shrink_list);
860         if (rc) {
861                 CERROR("add grant client %s error %d\n",
862                         client->cl_import->imp_obd->obd_name, rc);
863                 return rc;
864         }
865         CDEBUG(D_CACHE, "add grant client %s \n",
866                client->cl_import->imp_obd->obd_name);
867         osc_update_next_shrink(client);
868         return 0;
869 }
870
871 static int osc_del_shrink_grant(struct client_obd *client)
872 {
873         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
874                                          TIMEOUT_GRANT);
875 }
876
877 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
878 {
879         /*
880          * ocd_grant is the total grant amount we're expect to hold: if we've
881          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
882          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
883          * dirty.
884          *
885          * race is tolerable here: if we're evicted, but imp_state already
886          * left EVICTED state, then cl_dirty_pages must be 0 already.
887          */
888         spin_lock(&cli->cl_loi_list_lock);
889         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
890                 cli->cl_avail_grant = ocd->ocd_grant;
891         else
892                 cli->cl_avail_grant = ocd->ocd_grant -
893                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
894
895         if (cli->cl_avail_grant < 0) {
896                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
897                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
898                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
899                 /* workaround for servers which do not have the patch from
900                  * LU-2679 */
901                 cli->cl_avail_grant = ocd->ocd_grant;
902         }
903
904         /* determine the appropriate chunk size used by osc_extent. */
905         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
906         spin_unlock(&cli->cl_loi_list_lock);
907
908         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
909                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
910                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
911
912         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
913             list_empty(&cli->cl_grant_shrink_list))
914                 osc_add_shrink_grant(cli);
915 }
916
917 /* We assume that the reason this OSC got a short read is because it read
918  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
919  * via the LOV, and it _knows_ it's reading inside the file, it's just that
920  * this stripe never got written at or beyond this stripe offset yet. */
921 static void handle_short_read(int nob_read, size_t page_count,
922                               struct brw_page **pga)
923 {
924         char *ptr;
925         int i = 0;
926
927         /* skip bytes read OK */
928         while (nob_read > 0) {
929                 LASSERT (page_count > 0);
930
931                 if (pga[i]->count > nob_read) {
932                         /* EOF inside this page */
933                         ptr = kmap(pga[i]->pg) +
934                                 (pga[i]->off & ~PAGE_MASK);
935                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
936                         kunmap(pga[i]->pg);
937                         page_count--;
938                         i++;
939                         break;
940                 }
941
942                 nob_read -= pga[i]->count;
943                 page_count--;
944                 i++;
945         }
946
947         /* zero remaining pages */
948         while (page_count-- > 0) {
949                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
950                 memset(ptr, 0, pga[i]->count);
951                 kunmap(pga[i]->pg);
952                 i++;
953         }
954 }
955
956 static int check_write_rcs(struct ptlrpc_request *req,
957                            int requested_nob, int niocount,
958                            size_t page_count, struct brw_page **pga)
959 {
960         int     i;
961         __u32   *remote_rcs;
962
963         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
964                                                   sizeof(*remote_rcs) *
965                                                   niocount);
966         if (remote_rcs == NULL) {
967                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
968                 return(-EPROTO);
969         }
970
971         /* return error if any niobuf was in error */
972         for (i = 0; i < niocount; i++) {
973                 if ((int)remote_rcs[i] < 0)
974                         return(remote_rcs[i]);
975
976                 if (remote_rcs[i] != 0) {
977                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
978                                 i, remote_rcs[i], req);
979                         return(-EPROTO);
980                 }
981         }
982
983         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
984                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
985                        req->rq_bulk->bd_nob_transferred, requested_nob);
986                 return(-EPROTO);
987         }
988
989         return (0);
990 }
991
992 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
993 {
994         if (p1->flag != p2->flag) {
995                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
996                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
997                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
998
999                 /* warn if we try to combine flags that we don't know to be
1000                  * safe to combine */
1001                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1002                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1003                               "report this at https://jira.hpdd.intel.com/\n",
1004                               p1->flag, p2->flag);
1005                 }
1006                 return 0;
1007         }
1008
1009         return (p1->off + p1->count == p2->off);
1010 }
1011
1012 static u32 osc_checksum_bulk(int nob, size_t pg_count,
1013                              struct brw_page **pga, int opc,
1014                              cksum_type_t cksum_type)
1015 {
1016         u32                             cksum;
1017         int                             i = 0;
1018         struct cfs_crypto_hash_desc     *hdesc;
1019         unsigned int                    bufsize;
1020         int                             err;
1021         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1022
1023         LASSERT(pg_count > 0);
1024
1025         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1026         if (IS_ERR(hdesc)) {
1027                 CERROR("Unable to initialize checksum hash %s\n",
1028                        cfs_crypto_hash_name(cfs_alg));
1029                 return PTR_ERR(hdesc);
1030         }
1031
1032         while (nob > 0 && pg_count > 0) {
1033                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1034
1035                 /* corrupt the data before we compute the checksum, to
1036                  * simulate an OST->client data error */
1037                 if (i == 0 && opc == OST_READ &&
1038                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1039                         unsigned char *ptr = kmap(pga[i]->pg);
1040                         int off = pga[i]->off & ~PAGE_MASK;
1041
1042                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1043                         kunmap(pga[i]->pg);
1044                 }
1045                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1046                                             pga[i]->off & ~PAGE_MASK,
1047                                             count);
1048                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1049                                (int)(pga[i]->off & ~PAGE_MASK));
1050
1051                 nob -= pga[i]->count;
1052                 pg_count--;
1053                 i++;
1054         }
1055
1056         bufsize = sizeof(cksum);
1057         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1058
1059         /* For sending we only compute the wrong checksum instead
1060          * of corrupting the data so it is still correct on a redo */
1061         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1062                 cksum++;
1063
1064         return cksum;
1065 }
1066
1067 static int
1068 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1069                      u32 page_count, struct brw_page **pga,
1070                      struct ptlrpc_request **reqp, struct obd_capa *ocapa,
1071                      int reserve, int resend)
1072 {
1073         struct ptlrpc_request   *req;
1074         struct ptlrpc_bulk_desc *desc;
1075         struct ost_body         *body;
1076         struct obd_ioobj        *ioobj;
1077         struct niobuf_remote    *niobuf;
1078         int niocount, i, requested_nob, opc, rc;
1079         struct osc_brw_async_args *aa;
1080         struct req_capsule      *pill;
1081         struct brw_page *pg_prev;
1082
1083         ENTRY;
1084         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1085                 RETURN(-ENOMEM); /* Recoverable */
1086         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1087                 RETURN(-EINVAL); /* Fatal */
1088
1089         if ((cmd & OBD_BRW_WRITE) != 0) {
1090                 opc = OST_WRITE;
1091                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1092                                                 cli->cl_import->imp_rq_pool,
1093                                                 &RQF_OST_BRW_WRITE);
1094         } else {
1095                 opc = OST_READ;
1096                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1097         }
1098         if (req == NULL)
1099                 RETURN(-ENOMEM);
1100
1101         for (niocount = i = 1; i < page_count; i++) {
1102                 if (!can_merge_pages(pga[i - 1], pga[i]))
1103                         niocount++;
1104         }
1105
1106         pill = &req->rq_pill;
1107         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1108                              sizeof(*ioobj));
1109         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1110                              niocount * sizeof(*niobuf));
1111         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1112
1113         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1114         if (rc) {
1115                 ptlrpc_request_free(req);
1116                 RETURN(rc);
1117         }
1118         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1119         ptlrpc_at_set_req_timeout(req);
1120         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1121          * retry logic */
1122         req->rq_no_retry_einprogress = 1;
1123
1124         desc = ptlrpc_prep_bulk_imp(req, page_count,
1125                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1126                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1127                 OST_BULK_PORTAL);
1128
1129         if (desc == NULL)
1130                 GOTO(out, rc = -ENOMEM);
1131         /* NB request now owns desc and will free it when it gets freed */
1132
1133         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1134         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1135         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1136         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1137
1138         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1139
1140         obdo_to_ioobj(oa, ioobj);
1141         ioobj->ioo_bufcnt = niocount;
1142         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1143          * that might be send for this request.  The actual number is decided
1144          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1145          * "max - 1" for old client compatibility sending "0", and also so the
1146          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1147         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1148         osc_pack_capa(req, body, ocapa);
1149         LASSERT(page_count > 0);
1150         pg_prev = pga[0];
1151         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1152                 struct brw_page *pg = pga[i];
1153                 int poff = pg->off & ~PAGE_MASK;
1154
1155                 LASSERT(pg->count > 0);
1156                 /* make sure there is no gap in the middle of page array */
1157                 LASSERTF(page_count == 1 ||
1158                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1159                           ergo(i > 0 && i < page_count - 1,
1160                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1161                           ergo(i == page_count - 1, poff == 0)),
1162                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1163                          i, page_count, pg, pg->off, pg->count);
1164                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1165                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1166                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1167                          i, page_count,
1168                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1169                          pg_prev->pg, page_private(pg_prev->pg),
1170                          pg_prev->pg->index, pg_prev->off);
1171                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1172                         (pg->flag & OBD_BRW_SRVLOCK));
1173
1174                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1175                 requested_nob += pg->count;
1176
1177                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1178                         niobuf--;
1179                         niobuf->rnb_len += pg->count;
1180                 } else {
1181                         niobuf->rnb_offset = pg->off;
1182                         niobuf->rnb_len    = pg->count;
1183                         niobuf->rnb_flags  = pg->flag;
1184                 }
1185                 pg_prev = pg;
1186         }
1187
1188         LASSERTF((void *)(niobuf - niocount) ==
1189                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1190                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1191                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1192
1193         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1194         if (resend) {
1195                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1196                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1197                         body->oa.o_flags = 0;
1198                 }
1199                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1200         }
1201
1202         if (osc_should_shrink_grant(cli))
1203                 osc_shrink_grant_local(cli, &body->oa);
1204
1205         /* size[REQ_REC_OFF] still sizeof (*body) */
1206         if (opc == OST_WRITE) {
1207                 if (cli->cl_checksum &&
1208                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1209                         /* store cl_cksum_type in a local variable since
1210                          * it can be changed via lprocfs */
1211                         cksum_type_t cksum_type = cli->cl_cksum_type;
1212
1213                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1214                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1215                                 body->oa.o_flags = 0;
1216                         }
1217                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1218                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1219                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1220                                                              page_count, pga,
1221                                                              OST_WRITE,
1222                                                              cksum_type);
1223                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1224                                body->oa.o_cksum);
1225                         /* save this in 'oa', too, for later checking */
1226                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1227                         oa->o_flags |= cksum_type_pack(cksum_type);
1228                 } else {
1229                         /* clear out the checksum flag, in case this is a
1230                          * resend but cl_checksum is no longer set. b=11238 */
1231                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1232                 }
1233                 oa->o_cksum = body->oa.o_cksum;
1234                 /* 1 RC per niobuf */
1235                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1236                                      sizeof(__u32) * niocount);
1237         } else {
1238                 if (cli->cl_checksum &&
1239                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1240                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1241                                 body->oa.o_flags = 0;
1242                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1243                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1244                 }
1245         }
1246         ptlrpc_request_set_replen(req);
1247
1248         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1249         aa = ptlrpc_req_async_args(req);
1250         aa->aa_oa = oa;
1251         aa->aa_requested_nob = requested_nob;
1252         aa->aa_nio_count = niocount;
1253         aa->aa_page_count = page_count;
1254         aa->aa_resends = 0;
1255         aa->aa_ppga = pga;
1256         aa->aa_cli = cli;
1257         INIT_LIST_HEAD(&aa->aa_oaps);
1258         if (ocapa && reserve)
1259                 aa->aa_ocapa = capa_get(ocapa);
1260
1261         *reqp = req;
1262         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1263         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1264                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1265                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1266         RETURN(0);
1267
1268  out:
1269         ptlrpc_req_finished(req);
1270         RETURN(rc);
1271 }
1272
1273 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1274                                 __u32 client_cksum, __u32 server_cksum, int nob,
1275                                 size_t page_count, struct brw_page **pga,
1276                                 cksum_type_t client_cksum_type)
1277 {
1278         __u32 new_cksum;
1279         char *msg;
1280         cksum_type_t cksum_type;
1281
1282         if (server_cksum == client_cksum) {
1283                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1284                 return 0;
1285         }
1286
1287         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1288                                        oa->o_flags : 0);
1289         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1290                                       cksum_type);
1291
1292         if (cksum_type != client_cksum_type)
1293                 msg = "the server did not use the checksum type specified in "
1294                       "the original request - likely a protocol problem";
1295         else if (new_cksum == server_cksum)
1296                 msg = "changed on the client after we checksummed it - "
1297                       "likely false positive due to mmap IO (bug 11742)";
1298         else if (new_cksum == client_cksum)
1299                 msg = "changed in transit before arrival at OST";
1300         else
1301                 msg = "changed in transit AND doesn't match the original - "
1302                       "likely false positive due to mmap IO (bug 11742)";
1303
1304         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1305                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1306                            msg, libcfs_nid2str(peer->nid),
1307                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1308                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1309                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1310                            POSTID(&oa->o_oi), pga[0]->off,
1311                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1312         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1313                "client csum now %x\n", client_cksum, client_cksum_type,
1314                server_cksum, cksum_type, new_cksum);
1315         return 1;
1316 }
1317
1318 /* Note rc enters this function as number of bytes transferred */
1319 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1320 {
1321         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1322         const lnet_process_id_t *peer =
1323                         &req->rq_import->imp_connection->c_peer;
1324         struct client_obd *cli = aa->aa_cli;
1325         struct ost_body *body;
1326         u32 client_cksum = 0;
1327         ENTRY;
1328
1329         if (rc < 0 && rc != -EDQUOT) {
1330                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1331                 RETURN(rc);
1332         }
1333
1334         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1336         if (body == NULL) {
1337                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1338                 RETURN(-EPROTO);
1339         }
1340
1341         /* set/clear over quota flag for a uid/gid */
1342         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1343             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1344                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1345
1346                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1347                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1348                        body->oa.o_flags);
1349                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1350         }
1351
1352         osc_update_grant(cli, body);
1353
1354         if (rc < 0)
1355                 RETURN(rc);
1356
1357         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1358                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1359
1360         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1361                 if (rc > 0) {
1362                         CERROR("Unexpected +ve rc %d\n", rc);
1363                         RETURN(-EPROTO);
1364                 }
1365                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1366
1367                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1368                         RETURN(-EAGAIN);
1369
1370                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1371                     check_write_checksum(&body->oa, peer, client_cksum,
1372                                          body->oa.o_cksum, aa->aa_requested_nob,
1373                                          aa->aa_page_count, aa->aa_ppga,
1374                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1375                         RETURN(-EAGAIN);
1376
1377                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1378                                      aa->aa_page_count, aa->aa_ppga);
1379                 GOTO(out, rc);
1380         }
1381
1382         /* The rest of this function executes only for OST_READs */
1383
1384         /* if unwrap_bulk failed, return -EAGAIN to retry */
1385         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1386         if (rc < 0)
1387                 GOTO(out, rc = -EAGAIN);
1388
1389         if (rc > aa->aa_requested_nob) {
1390                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1391                        aa->aa_requested_nob);
1392                 RETURN(-EPROTO);
1393         }
1394
1395         if (rc != req->rq_bulk->bd_nob_transferred) {
1396                 CERROR ("Unexpected rc %d (%d transferred)\n",
1397                         rc, req->rq_bulk->bd_nob_transferred);
1398                 return (-EPROTO);
1399         }
1400
1401         if (rc < aa->aa_requested_nob)
1402                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1403
1404         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1405                 static int cksum_counter;
1406                 u32        server_cksum = body->oa.o_cksum;
1407                 char      *via = "";
1408                 char      *router = "";
1409                 cksum_type_t cksum_type;
1410
1411                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1412                                                body->oa.o_flags : 0);
1413                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1414                                                  aa->aa_ppga, OST_READ,
1415                                                  cksum_type);
1416
1417                 if (peer->nid != req->rq_bulk->bd_sender) {
1418                         via = " via ";
1419                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1420                 }
1421
1422                 if (server_cksum != client_cksum) {
1423                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1424                                            "%s%s%s inode "DFID" object "DOSTID
1425                                            " extent ["LPU64"-"LPU64"]\n",
1426                                            req->rq_import->imp_obd->obd_name,
1427                                            libcfs_nid2str(peer->nid),
1428                                            via, router,
1429                                            body->oa.o_valid & OBD_MD_FLFID ?
1430                                                 body->oa.o_parent_seq : (__u64)0,
1431                                            body->oa.o_valid & OBD_MD_FLFID ?
1432                                                 body->oa.o_parent_oid : 0,
1433                                            body->oa.o_valid & OBD_MD_FLFID ?
1434                                                 body->oa.o_parent_ver : 0,
1435                                            POSTID(&body->oa.o_oi),
1436                                            aa->aa_ppga[0]->off,
1437                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1438                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1439                                                                         1);
1440                         CERROR("client %x, server %x, cksum_type %x\n",
1441                                client_cksum, server_cksum, cksum_type);
1442                         cksum_counter = 0;
1443                         aa->aa_oa->o_cksum = client_cksum;
1444                         rc = -EAGAIN;
1445                 } else {
1446                         cksum_counter++;
1447                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1448                         rc = 0;
1449                 }
1450         } else if (unlikely(client_cksum)) {
1451                 static int cksum_missed;
1452
1453                 cksum_missed++;
1454                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1455                         CERROR("Checksum %u requested from %s but not sent\n",
1456                                cksum_missed, libcfs_nid2str(peer->nid));
1457         } else {
1458                 rc = 0;
1459         }
1460 out:
1461         if (rc >= 0)
1462                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1463                                      aa->aa_oa, &body->oa);
1464
1465         RETURN(rc);
1466 }
1467
1468 static int osc_brw_redo_request(struct ptlrpc_request *request,
1469                                 struct osc_brw_async_args *aa, int rc)
1470 {
1471         struct ptlrpc_request *new_req;
1472         struct osc_brw_async_args *new_aa;
1473         struct osc_async_page *oap;
1474         ENTRY;
1475
1476         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1477                   "redo for recoverable error %d", rc);
1478
1479         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1480                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1481                                   aa->aa_cli, aa->aa_oa,
1482                                   aa->aa_page_count, aa->aa_ppga,
1483                                   &new_req, aa->aa_ocapa, 0, 1);
1484         if (rc)
1485                 RETURN(rc);
1486
1487         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1488                 if (oap->oap_request != NULL) {
1489                         LASSERTF(request == oap->oap_request,
1490                                  "request %p != oap_request %p\n",
1491                                  request, oap->oap_request);
1492                         if (oap->oap_interrupted) {
1493                                 ptlrpc_req_finished(new_req);
1494                                 RETURN(-EINTR);
1495                         }
1496                 }
1497         }
1498         /* New request takes over pga and oaps from old request.
1499          * Note that copying a list_head doesn't work, need to move it... */
1500         aa->aa_resends++;
1501         new_req->rq_interpret_reply = request->rq_interpret_reply;
1502         new_req->rq_async_args = request->rq_async_args;
1503         new_req->rq_commit_cb = request->rq_commit_cb;
1504         /* cap resend delay to the current request timeout, this is similar to
1505          * what ptlrpc does (see after_reply()) */
1506         if (aa->aa_resends > new_req->rq_timeout)
1507                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1508         else
1509                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1510         new_req->rq_generation_set = 1;
1511         new_req->rq_import_generation = request->rq_import_generation;
1512
1513         new_aa = ptlrpc_req_async_args(new_req);
1514
1515         INIT_LIST_HEAD(&new_aa->aa_oaps);
1516         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1517         INIT_LIST_HEAD(&new_aa->aa_exts);
1518         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1519         new_aa->aa_resends = aa->aa_resends;
1520
1521         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1522                 if (oap->oap_request) {
1523                         ptlrpc_req_finished(oap->oap_request);
1524                         oap->oap_request = ptlrpc_request_addref(new_req);
1525                 }
1526         }
1527
1528         new_aa->aa_ocapa = aa->aa_ocapa;
1529         aa->aa_ocapa = NULL;
1530
1531         /* XXX: This code will run into problem if we're going to support
1532          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1533          * and wait for all of them to be finished. We should inherit request
1534          * set from old request. */
1535         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1536
1537         DEBUG_REQ(D_INFO, new_req, "new request");
1538         RETURN(0);
1539 }
1540
1541 /*
1542  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1543  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1544  * fine for our small page arrays and doesn't require allocation.  its an
1545  * insertion sort that swaps elements that are strides apart, shrinking the
1546  * stride down until its '1' and the array is sorted.
1547  */
1548 static void sort_brw_pages(struct brw_page **array, int num)
1549 {
1550         int stride, i, j;
1551         struct brw_page *tmp;
1552
1553         if (num == 1)
1554                 return;
1555         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1556                 ;
1557
1558         do {
1559                 stride /= 3;
1560                 for (i = stride ; i < num ; i++) {
1561                         tmp = array[i];
1562                         j = i;
1563                         while (j >= stride && array[j - stride]->off > tmp->off) {
1564                                 array[j] = array[j - stride];
1565                                 j -= stride;
1566                         }
1567                         array[j] = tmp;
1568                 }
1569         } while (stride > 1);
1570 }
1571
1572 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1573 {
1574         LASSERT(ppga != NULL);
1575         OBD_FREE(ppga, sizeof(*ppga) * count);
1576 }
1577
1578 static int brw_interpret(const struct lu_env *env,
1579                          struct ptlrpc_request *req, void *data, int rc)
1580 {
1581         struct osc_brw_async_args *aa = data;
1582         struct osc_extent *ext;
1583         struct osc_extent *tmp;
1584         struct client_obd *cli = aa->aa_cli;
1585         ENTRY;
1586
1587         rc = osc_brw_fini_request(req, rc);
1588         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1589         /* When server return -EINPROGRESS, client should always retry
1590          * regardless of the number of times the bulk was resent already. */
1591         if (osc_recoverable_error(rc)) {
1592                 if (req->rq_import_generation !=
1593                     req->rq_import->imp_generation) {
1594                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1595                                ""DOSTID", rc = %d.\n",
1596                                req->rq_import->imp_obd->obd_name,
1597                                POSTID(&aa->aa_oa->o_oi), rc);
1598                 } else if (rc == -EINPROGRESS ||
1599                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1600                         rc = osc_brw_redo_request(req, aa, rc);
1601                 } else {
1602                         CERROR("%s: too many resent retries for object: "
1603                                ""LPU64":"LPU64", rc = %d.\n",
1604                                req->rq_import->imp_obd->obd_name,
1605                                POSTID(&aa->aa_oa->o_oi), rc);
1606                 }
1607
1608                 if (rc == 0)
1609                         RETURN(0);
1610                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1611                         rc = -EIO;
1612         }
1613
1614         if (aa->aa_ocapa) {
1615                 capa_put(aa->aa_ocapa);
1616                 aa->aa_ocapa = NULL;
1617         }
1618
1619         if (rc == 0) {
1620                 struct obdo *oa = aa->aa_oa;
1621                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1622                 unsigned long valid = 0;
1623                 struct cl_object *obj;
1624                 struct osc_async_page *last;
1625
1626                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1627                 obj = osc2cl(last->oap_obj);
1628
1629                 cl_object_attr_lock(obj);
1630                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1631                         attr->cat_blocks = oa->o_blocks;
1632                         valid |= CAT_BLOCKS;
1633                 }
1634                 if (oa->o_valid & OBD_MD_FLMTIME) {
1635                         attr->cat_mtime = oa->o_mtime;
1636                         valid |= CAT_MTIME;
1637                 }
1638                 if (oa->o_valid & OBD_MD_FLATIME) {
1639                         attr->cat_atime = oa->o_atime;
1640                         valid |= CAT_ATIME;
1641                 }
1642                 if (oa->o_valid & OBD_MD_FLCTIME) {
1643                         attr->cat_ctime = oa->o_ctime;
1644                         valid |= CAT_CTIME;
1645                 }
1646
1647                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1648                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1649                         loff_t last_off = last->oap_count + last->oap_obj_off +
1650                                 last->oap_page_off;
1651
1652                         /* Change file size if this is an out of quota or
1653                          * direct IO write and it extends the file size */
1654                         if (loi->loi_lvb.lvb_size < last_off) {
1655                                 attr->cat_size = last_off;
1656                                 valid |= CAT_SIZE;
1657                         }
1658                         /* Extend KMS if it's not a lockless write */
1659                         if (loi->loi_kms < last_off &&
1660                             oap2osc_page(last)->ops_srvlock == 0) {
1661                                 attr->cat_kms = last_off;
1662                                 valid |= CAT_KMS;
1663                         }
1664                 }
1665
1666                 if (valid != 0)
1667                         cl_object_attr_update(env, obj, attr, valid);
1668                 cl_object_attr_unlock(obj);
1669         }
1670         OBDO_FREE(aa->aa_oa);
1671
1672         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1673                 osc_inc_unstable_pages(req);
1674
1675         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1676                 list_del_init(&ext->oe_link);
1677                 osc_extent_finish(env, ext, 1, rc);
1678         }
1679         LASSERT(list_empty(&aa->aa_exts));
1680         LASSERT(list_empty(&aa->aa_oaps));
1681
1682         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1683                           req->rq_bulk->bd_nob_transferred);
1684         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1685         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1686
1687         spin_lock(&cli->cl_loi_list_lock);
1688         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1689          * is called so we know whether to go to sync BRWs or wait for more
1690          * RPCs to complete */
1691         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1692                 cli->cl_w_in_flight--;
1693         else
1694                 cli->cl_r_in_flight--;
1695         osc_wake_cache_waiters(cli);
1696         spin_unlock(&cli->cl_loi_list_lock);
1697
1698         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1699         RETURN(rc);
1700 }
1701
1702 static void brw_commit(struct ptlrpc_request *req)
1703 {
1704         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1705          * this called via the rq_commit_cb, I need to ensure
1706          * osc_dec_unstable_pages is still called. Otherwise unstable
1707          * pages may be leaked. */
1708         spin_lock(&req->rq_lock);
1709         if (likely(req->rq_unstable)) {
1710                 req->rq_unstable = 0;
1711                 spin_unlock(&req->rq_lock);
1712
1713                 osc_dec_unstable_pages(req);
1714         } else {
1715                 req->rq_committed = 1;
1716                 spin_unlock(&req->rq_lock);
1717         }
1718 }
1719
1720 /**
1721  * Build an RPC by the list of extent @ext_list. The caller must ensure
1722  * that the total pages in this list are NOT over max pages per RPC.
1723  * Extents in the list must be in OES_RPC state.
1724  */
1725 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1726                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1727 {
1728         struct ptlrpc_request           *req = NULL;
1729         struct osc_extent               *ext;
1730         struct brw_page                 **pga = NULL;
1731         struct osc_brw_async_args       *aa = NULL;
1732         struct obdo                     *oa = NULL;
1733         struct osc_async_page           *oap;
1734         struct osc_async_page           *tmp;
1735         struct cl_req                   *clerq = NULL;
1736         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1737                                                                       CRT_READ;
1738         struct cl_req_attr              *crattr = NULL;
1739         loff_t                          starting_offset = OBD_OBJECT_EOF;
1740         loff_t                          ending_offset = 0;
1741         int                             mpflag = 0;
1742         int                             mem_tight = 0;
1743         int                             page_count = 0;
1744         bool                            soft_sync = false;
1745         int                             i;
1746         int                             rc;
1747         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1748         struct ost_body                 *body;
1749         ENTRY;
1750         LASSERT(!list_empty(ext_list));
1751
1752         /* add pages into rpc_list to build BRW rpc */
1753         list_for_each_entry(ext, ext_list, oe_link) {
1754                 LASSERT(ext->oe_state == OES_RPC);
1755                 mem_tight |= ext->oe_memalloc;
1756                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1757                         ++page_count;
1758                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1759                         if (starting_offset == OBD_OBJECT_EOF ||
1760                             starting_offset > oap->oap_obj_off)
1761                                 starting_offset = oap->oap_obj_off;
1762                         else
1763                                 LASSERT(oap->oap_page_off == 0);
1764                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1765                                 ending_offset = oap->oap_obj_off +
1766                                                 oap->oap_count;
1767                         else
1768                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1769                                         PAGE_CACHE_SIZE);
1770                 }
1771         }
1772
1773         soft_sync = osc_over_unstable_soft_limit(cli);
1774         if (mem_tight)
1775                 mpflag = cfs_memory_pressure_get_and_set();
1776
1777         OBD_ALLOC(crattr, sizeof(*crattr));
1778         if (crattr == NULL)
1779                 GOTO(out, rc = -ENOMEM);
1780
1781         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1782         if (pga == NULL)
1783                 GOTO(out, rc = -ENOMEM);
1784
1785         OBDO_ALLOC(oa);
1786         if (oa == NULL)
1787                 GOTO(out, rc = -ENOMEM);
1788
1789         i = 0;
1790         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1791                 struct cl_page *page = oap2cl_page(oap);
1792                 if (clerq == NULL) {
1793                         clerq = cl_req_alloc(env, page, crt,
1794                                              1 /* only 1-object rpcs for now */);
1795                         if (IS_ERR(clerq))
1796                                 GOTO(out, rc = PTR_ERR(clerq));
1797                 }
1798                 if (mem_tight)
1799                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1800                 if (soft_sync)
1801                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1802                 pga[i] = &oap->oap_brw_page;
1803                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1804                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1805                        pga[i]->pg, page_index(oap->oap_page), oap,
1806                        pga[i]->flag);
1807                 i++;
1808                 cl_req_page_add(env, clerq, page);
1809         }
1810
1811         /* always get the data for the obdo for the rpc */
1812         LASSERT(clerq != NULL);
1813         crattr->cra_oa = oa;
1814         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1815
1816         rc = cl_req_prep(env, clerq);
1817         if (rc != 0) {
1818                 CERROR("cl_req_prep failed: %d\n", rc);
1819                 GOTO(out, rc);
1820         }
1821
1822         sort_brw_pages(pga, page_count);
1823         rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req,
1824                                   crattr->cra_capa, 1, 0);
1825         if (rc != 0) {
1826                 CERROR("prep_req failed: %d\n", rc);
1827                 GOTO(out, rc);
1828         }
1829
1830         req->rq_commit_cb = brw_commit;
1831         req->rq_interpret_reply = brw_interpret;
1832
1833         if (mem_tight != 0)
1834                 req->rq_memalloc = 1;
1835
1836         /* Need to update the timestamps after the request is built in case
1837          * we race with setattr (locally or in queue at OST).  If OST gets
1838          * later setattr before earlier BRW (as determined by the request xid),
1839          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1840          * way to do this in a single call.  bug 10150 */
1841         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1842         crattr->cra_oa = &body->oa;
1843         cl_req_attr_set(env, clerq, crattr,
1844                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1845
1846         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1847
1848         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1849         aa = ptlrpc_req_async_args(req);
1850         INIT_LIST_HEAD(&aa->aa_oaps);
1851         list_splice_init(&rpc_list, &aa->aa_oaps);
1852         INIT_LIST_HEAD(&aa->aa_exts);
1853         list_splice_init(ext_list, &aa->aa_exts);
1854         aa->aa_clerq = clerq;
1855
1856         /* queued sync pages can be torn down while the pages
1857          * were between the pending list and the rpc */
1858         tmp = NULL;
1859         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1860                 /* only one oap gets a request reference */
1861                 if (tmp == NULL)
1862                         tmp = oap;
1863                 if (oap->oap_interrupted && !req->rq_intr) {
1864                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1865                                         oap, req);
1866                         ptlrpc_mark_interrupted(req);
1867                 }
1868         }
1869         if (tmp != NULL)
1870                 tmp->oap_request = ptlrpc_request_addref(req);
1871
1872         spin_lock(&cli->cl_loi_list_lock);
1873         starting_offset >>= PAGE_CACHE_SHIFT;
1874         if (cmd == OBD_BRW_READ) {
1875                 cli->cl_r_in_flight++;
1876                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1877                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1878                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1879                                       starting_offset + 1);
1880         } else {
1881                 cli->cl_w_in_flight++;
1882                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1883                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1884                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1885                                       starting_offset + 1);
1886         }
1887         spin_unlock(&cli->cl_loi_list_lock);
1888
1889         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1890                   page_count, aa, cli->cl_r_in_flight,
1891                   cli->cl_w_in_flight);
1892
1893         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1894          * see which CPU/NUMA node the majority of pages were allocated
1895          * on, and try to assign the async RPC to the CPU core
1896          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1897          *
1898          * But on the other hand, we expect that multiple ptlrpcd
1899          * threads and the initial write sponsor can run in parallel,
1900          * especially when data checksum is enabled, which is CPU-bound
1901          * operation and single ptlrpcd thread cannot process in time.
1902          * So more ptlrpcd threads sharing BRW load
1903          * (with PDL_POLICY_ROUND) seems better.
1904          */
1905         ptlrpcd_add_req(req, pol, -1);
1906         rc = 0;
1907         EXIT;
1908
1909 out:
1910         if (mem_tight != 0)
1911                 cfs_memory_pressure_restore(mpflag);
1912
1913         if (crattr != NULL) {
1914                 capa_put(crattr->cra_capa);
1915                 OBD_FREE(crattr, sizeof(*crattr));
1916         }
1917
1918         if (rc != 0) {
1919                 LASSERT(req == NULL);
1920
1921                 if (oa)
1922                         OBDO_FREE(oa);
1923                 if (pga)
1924                         OBD_FREE(pga, sizeof(*pga) * page_count);
1925                 /* this should happen rarely and is pretty bad, it makes the
1926                  * pending list not follow the dirty order */
1927                 while (!list_empty(ext_list)) {
1928                         ext = list_entry(ext_list->next, struct osc_extent,
1929                                          oe_link);
1930                         list_del_init(&ext->oe_link);
1931                         osc_extent_finish(env, ext, 0, rc);
1932                 }
1933                 if (clerq && !IS_ERR(clerq))
1934                         cl_req_completion(env, clerq, rc);
1935         }
1936         RETURN(rc);
1937 }
1938
1939 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1940                                         struct ldlm_enqueue_info *einfo)
1941 {
1942         void *data = einfo->ei_cbdata;
1943         int set = 0;
1944
1945         LASSERT(lock != NULL);
1946         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1947         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1948         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1949         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1950
1951         lock_res_and_lock(lock);
1952
1953         if (lock->l_ast_data == NULL)
1954                 lock->l_ast_data = data;
1955         if (lock->l_ast_data == data)
1956                 set = 1;
1957
1958         unlock_res_and_lock(lock);
1959
1960         return set;
1961 }
1962
1963 static int osc_set_data_with_check(struct lustre_handle *lockh,
1964                                    struct ldlm_enqueue_info *einfo)
1965 {
1966         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
1967         int set = 0;
1968
1969         if (lock != NULL) {
1970                 set = osc_set_lock_data_with_check(lock, einfo);
1971                 LDLM_LOCK_PUT(lock);
1972         } else
1973                 CERROR("lockh %p, data %p - client evicted?\n",
1974                        lockh, einfo->ei_cbdata);
1975         return set;
1976 }
1977
1978 static int osc_enqueue_fini(struct ptlrpc_request *req,
1979                             osc_enqueue_upcall_f upcall, void *cookie,
1980                             struct lustre_handle *lockh, ldlm_mode_t mode,
1981                             __u64 *flags, int agl, int errcode)
1982 {
1983         bool intent = *flags & LDLM_FL_HAS_INTENT;
1984         int rc;
1985         ENTRY;
1986
1987         /* The request was created before ldlm_cli_enqueue call. */
1988         if (intent && errcode == ELDLM_LOCK_ABORTED) {
1989                 struct ldlm_reply *rep;
1990
1991                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1992                 LASSERT(rep != NULL);
1993
1994                 rep->lock_policy_res1 =
1995                         ptlrpc_status_ntoh(rep->lock_policy_res1);
1996                 if (rep->lock_policy_res1)
1997                         errcode = rep->lock_policy_res1;
1998                 if (!agl)
1999                         *flags |= LDLM_FL_LVB_READY;
2000         } else if (errcode == ELDLM_OK) {
2001                 *flags |= LDLM_FL_LVB_READY;
2002         }
2003
2004         /* Call the update callback. */
2005         rc = (*upcall)(cookie, lockh, errcode);
2006
2007         /* release the reference taken in ldlm_cli_enqueue() */
2008         if (errcode == ELDLM_LOCK_MATCHED)
2009                 errcode = ELDLM_OK;
2010         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2011                 ldlm_lock_decref(lockh, mode);
2012
2013         RETURN(rc);
2014 }
2015
2016 static int osc_enqueue_interpret(const struct lu_env *env,
2017                                  struct ptlrpc_request *req,
2018                                  struct osc_enqueue_args *aa, int rc)
2019 {
2020         struct ldlm_lock *lock;
2021         struct lustre_handle *lockh = &aa->oa_lockh;
2022         ldlm_mode_t mode = aa->oa_mode;
2023         struct ost_lvb *lvb = aa->oa_lvb;
2024         __u32 lvb_len = sizeof(*lvb);
2025         __u64 flags = 0;
2026
2027         ENTRY;
2028
2029         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2030          * be valid. */
2031         lock = ldlm_handle2lock(lockh);
2032         LASSERTF(lock != NULL,
2033                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2034                  lockh->cookie, req, aa);
2035
2036         /* Take an additional reference so that a blocking AST that
2037          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2038          * to arrive after an upcall has been executed by
2039          * osc_enqueue_fini(). */
2040         ldlm_lock_addref(lockh, mode);
2041
2042         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2043         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2044
2045         /* Let CP AST to grant the lock first. */
2046         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2047
2048         if (aa->oa_agl) {
2049                 LASSERT(aa->oa_lvb == NULL);
2050                 LASSERT(aa->oa_flags == NULL);
2051                 aa->oa_flags = &flags;
2052         }
2053
2054         /* Complete obtaining the lock procedure. */
2055         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2056                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2057                                    lockh, rc);
2058         /* Complete osc stuff. */
2059         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2060                               aa->oa_flags, aa->oa_agl, rc);
2061
2062         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2063
2064         ldlm_lock_decref(lockh, mode);
2065         LDLM_LOCK_PUT(lock);
2066         RETURN(rc);
2067 }
2068
2069 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2070
2071 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2072  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2073  * other synchronous requests, however keeping some locks and trying to obtain
2074  * others may take a considerable amount of time in a case of ost failure; and
2075  * when other sync requests do not get released lock from a client, the client
2076  * is evicted from the cluster -- such scenarious make the life difficult, so
2077  * release locks just after they are obtained. */
2078 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2079                      __u64 *flags, ldlm_policy_data_t *policy,
2080                      struct ost_lvb *lvb, int kms_valid,
2081                      osc_enqueue_upcall_f upcall, void *cookie,
2082                      struct ldlm_enqueue_info *einfo,
2083                      struct ptlrpc_request_set *rqset, int async, int agl)
2084 {
2085         struct obd_device *obd = exp->exp_obd;
2086         struct lustre_handle lockh = { 0 };
2087         struct ptlrpc_request *req = NULL;
2088         int intent = *flags & LDLM_FL_HAS_INTENT;
2089         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2090         ldlm_mode_t mode;
2091         int rc;
2092         ENTRY;
2093
2094         /* Filesystem lock extents are extended to page boundaries so that
2095          * dealing with the page cache is a little smoother.  */
2096         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2097         policy->l_extent.end |= ~PAGE_MASK;
2098
2099         /*
2100          * kms is not valid when either object is completely fresh (so that no
2101          * locks are cached), or object was evicted. In the latter case cached
2102          * lock cannot be used, because it would prime inode state with
2103          * potentially stale LVB.
2104          */
2105         if (!kms_valid)
2106                 goto no_match;
2107
2108         /* Next, search for already existing extent locks that will cover us */
2109         /* If we're trying to read, we also search for an existing PW lock.  The
2110          * VFS and page cache already protect us locally, so lots of readers/
2111          * writers can share a single PW lock.
2112          *
2113          * There are problems with conversion deadlocks, so instead of
2114          * converting a read lock to a write lock, we'll just enqueue a new
2115          * one.
2116          *
2117          * At some point we should cancel the read lock instead of making them
2118          * send us a blocking callback, but there are problems with canceling
2119          * locks out from other users right now, too. */
2120         mode = einfo->ei_mode;
2121         if (einfo->ei_mode == LCK_PR)
2122                 mode |= LCK_PW;
2123         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2124                                einfo->ei_type, policy, mode, &lockh, 0);
2125         if (mode) {
2126                 struct ldlm_lock *matched;
2127
2128                 if (*flags & LDLM_FL_TEST_LOCK)
2129                         RETURN(ELDLM_OK);
2130
2131                 matched = ldlm_handle2lock(&lockh);
2132                 if (agl) {
2133                         /* AGL enqueues DLM locks speculatively. Therefore if
2134                          * it already exists a DLM lock, it wll just inform the
2135                          * caller to cancel the AGL process for this stripe. */
2136                         ldlm_lock_decref(&lockh, mode);
2137                         LDLM_LOCK_PUT(matched);
2138                         RETURN(-ECANCELED);
2139                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2140                         *flags |= LDLM_FL_LVB_READY;
2141
2142                         /* We already have a lock, and it's referenced. */
2143                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2144
2145                         ldlm_lock_decref(&lockh, mode);
2146                         LDLM_LOCK_PUT(matched);
2147                         RETURN(ELDLM_OK);
2148                 } else {
2149                         ldlm_lock_decref(&lockh, mode);
2150                         LDLM_LOCK_PUT(matched);
2151                 }
2152         }
2153
2154 no_match:
2155         if (*flags & LDLM_FL_TEST_LOCK)
2156                 RETURN(-ENOLCK);
2157
2158         if (intent) {
2159                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2160                                            &RQF_LDLM_ENQUEUE_LVB);
2161                 if (req == NULL)
2162                         RETURN(-ENOMEM);
2163
2164                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2165                 if (rc < 0) {
2166                         ptlrpc_request_free(req);
2167                         RETURN(rc);
2168                 }
2169
2170                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2171                                      sizeof *lvb);
2172                 ptlrpc_request_set_replen(req);
2173         }
2174
2175         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2176         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2177
2178         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2179                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2180         if (async) {
2181                 if (!rc) {
2182                         struct osc_enqueue_args *aa;
2183                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2184                         aa = ptlrpc_req_async_args(req);
2185                         aa->oa_exp    = exp;
2186                         aa->oa_mode   = einfo->ei_mode;
2187                         aa->oa_type   = einfo->ei_type;
2188                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2189                         aa->oa_upcall = upcall;
2190                         aa->oa_cookie = cookie;
2191                         aa->oa_agl    = !!agl;
2192                         if (!agl) {
2193                                 aa->oa_flags  = flags;
2194                                 aa->oa_lvb    = lvb;
2195                         } else {
2196                                 /* AGL is essentially to enqueue an DLM lock
2197                                  * in advance, so we don't care about the
2198                                  * result of AGL enqueue. */
2199                                 aa->oa_lvb    = NULL;
2200                                 aa->oa_flags  = NULL;
2201                         }
2202
2203                         req->rq_interpret_reply =
2204                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2205                         if (rqset == PTLRPCD_SET)
2206                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2207                         else
2208                                 ptlrpc_set_add_req(rqset, req);
2209                 } else if (intent) {
2210                         ptlrpc_req_finished(req);
2211                 }
2212                 RETURN(rc);
2213         }
2214
2215         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2216                               flags, agl, rc);
2217         if (intent)
2218                 ptlrpc_req_finished(req);
2219
2220         RETURN(rc);
2221 }
2222
2223 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2224                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2225                    __u64 *flags, void *data, struct lustre_handle *lockh,
2226                    int unref)
2227 {
2228         struct obd_device *obd = exp->exp_obd;
2229         __u64 lflags = *flags;
2230         ldlm_mode_t rc;
2231         ENTRY;
2232
2233         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2234                 RETURN(-EIO);
2235
2236         /* Filesystem lock extents are extended to page boundaries so that
2237          * dealing with the page cache is a little smoother */
2238         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2239         policy->l_extent.end |= ~PAGE_MASK;
2240
2241         /* Next, search for already existing extent locks that will cover us */
2242         /* If we're trying to read, we also search for an existing PW lock.  The
2243          * VFS and page cache already protect us locally, so lots of readers/
2244          * writers can share a single PW lock. */
2245         rc = mode;
2246         if (mode == LCK_PR)
2247                 rc |= LCK_PW;
2248         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2249                              res_id, type, policy, rc, lockh, unref);
2250         if (rc) {
2251                 if (data != NULL) {
2252                         if (!osc_set_data_with_check(lockh, data)) {
2253                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2254                                         ldlm_lock_decref(lockh, rc);
2255                                 RETURN(0);
2256                         }
2257                 }
2258                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2259                         ldlm_lock_addref(lockh, LCK_PR);
2260                         ldlm_lock_decref(lockh, LCK_PW);
2261                 }
2262                 RETURN(rc);
2263         }
2264         RETURN(rc);
2265 }
2266
2267 static int osc_statfs_interpret(const struct lu_env *env,
2268                                 struct ptlrpc_request *req,
2269                                 struct osc_async_args *aa, int rc)
2270 {
2271         struct obd_statfs *msfs;
2272         ENTRY;
2273
2274         if (rc == -EBADR)
2275                 /* The request has in fact never been sent
2276                  * due to issues at a higher level (LOV).
2277                  * Exit immediately since the caller is
2278                  * aware of the problem and takes care
2279                  * of the clean up */
2280                  RETURN(rc);
2281
2282         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2283             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2284                 GOTO(out, rc = 0);
2285
2286         if (rc != 0)
2287                 GOTO(out, rc);
2288
2289         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2290         if (msfs == NULL) {
2291                 GOTO(out, rc = -EPROTO);
2292         }
2293
2294         *aa->aa_oi->oi_osfs = *msfs;
2295 out:
2296         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2297         RETURN(rc);
2298 }
2299
2300 static int osc_statfs_async(struct obd_export *exp,
2301                             struct obd_info *oinfo, __u64 max_age,
2302                             struct ptlrpc_request_set *rqset)
2303 {
2304         struct obd_device     *obd = class_exp2obd(exp);
2305         struct ptlrpc_request *req;
2306         struct osc_async_args *aa;
2307         int                    rc;
2308         ENTRY;
2309
2310         /* We could possibly pass max_age in the request (as an absolute
2311          * timestamp or a "seconds.usec ago") so the target can avoid doing
2312          * extra calls into the filesystem if that isn't necessary (e.g.
2313          * during mount that would help a bit).  Having relative timestamps
2314          * is not so great if request processing is slow, while absolute
2315          * timestamps are not ideal because they need time synchronization. */
2316         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2317         if (req == NULL)
2318                 RETURN(-ENOMEM);
2319
2320         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2321         if (rc) {
2322                 ptlrpc_request_free(req);
2323                 RETURN(rc);
2324         }
2325         ptlrpc_request_set_replen(req);
2326         req->rq_request_portal = OST_CREATE_PORTAL;
2327         ptlrpc_at_set_req_timeout(req);
2328
2329         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2330                 /* procfs requests not want stat in wait for avoid deadlock */
2331                 req->rq_no_resend = 1;
2332                 req->rq_no_delay = 1;
2333         }
2334
2335         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2336         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2337         aa = ptlrpc_req_async_args(req);
2338         aa->aa_oi = oinfo;
2339
2340         ptlrpc_set_add_req(rqset, req);
2341         RETURN(0);
2342 }
2343
2344 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2345                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2346 {
2347         struct obd_device     *obd = class_exp2obd(exp);
2348         struct obd_statfs     *msfs;
2349         struct ptlrpc_request *req;
2350         struct obd_import     *imp = NULL;
2351         int rc;
2352         ENTRY;
2353
2354         /*Since the request might also come from lprocfs, so we need
2355          *sync this with client_disconnect_export Bug15684*/
2356         down_read(&obd->u.cli.cl_sem);
2357         if (obd->u.cli.cl_import)
2358                 imp = class_import_get(obd->u.cli.cl_import);
2359         up_read(&obd->u.cli.cl_sem);
2360         if (!imp)
2361                 RETURN(-ENODEV);
2362
2363         /* We could possibly pass max_age in the request (as an absolute
2364          * timestamp or a "seconds.usec ago") so the target can avoid doing
2365          * extra calls into the filesystem if that isn't necessary (e.g.
2366          * during mount that would help a bit).  Having relative timestamps
2367          * is not so great if request processing is slow, while absolute
2368          * timestamps are not ideal because they need time synchronization. */
2369         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2370
2371         class_import_put(imp);
2372
2373         if (req == NULL)
2374                 RETURN(-ENOMEM);
2375
2376         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2377         if (rc) {
2378                 ptlrpc_request_free(req);
2379                 RETURN(rc);
2380         }
2381         ptlrpc_request_set_replen(req);
2382         req->rq_request_portal = OST_CREATE_PORTAL;
2383         ptlrpc_at_set_req_timeout(req);
2384
2385         if (flags & OBD_STATFS_NODELAY) {
2386                 /* procfs requests not want stat in wait for avoid deadlock */
2387                 req->rq_no_resend = 1;
2388                 req->rq_no_delay = 1;
2389         }
2390
2391         rc = ptlrpc_queue_wait(req);
2392         if (rc)
2393                 GOTO(out, rc);
2394
2395         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2396         if (msfs == NULL) {
2397                 GOTO(out, rc = -EPROTO);
2398         }
2399
2400         *osfs = *msfs;
2401
2402         EXIT;
2403  out:
2404         ptlrpc_req_finished(req);
2405         return rc;
2406 }
2407
2408 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2409                          void *karg, void *uarg)
2410 {
2411         struct obd_device *obd = exp->exp_obd;
2412         struct obd_ioctl_data *data = karg;
2413         int err = 0;
2414         ENTRY;
2415
2416         if (!try_module_get(THIS_MODULE)) {
2417                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2418                        module_name(THIS_MODULE));
2419                 return -EINVAL;
2420         }
2421         switch (cmd) {
2422         case OBD_IOC_CLIENT_RECOVER:
2423                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2424                                             data->ioc_inlbuf1, 0);
2425                 if (err > 0)
2426                         err = 0;
2427                 GOTO(out, err);
2428         case IOC_OSC_SET_ACTIVE:
2429                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2430                                                data->ioc_offset);
2431                 GOTO(out, err);
2432         case OBD_IOC_POLL_QUOTACHECK:
2433                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2434                 GOTO(out, err);
2435         case OBD_IOC_PING_TARGET:
2436                 err = ptlrpc_obd_ping(obd);
2437                 GOTO(out, err);
2438         default:
2439                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2440                        cmd, current_comm());
2441                 GOTO(out, err = -ENOTTY);
2442         }
2443 out:
2444         module_put(THIS_MODULE);
2445         return err;
2446 }
2447
2448 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2449                               u32 keylen, void *key,
2450                               u32 vallen, void *val,
2451                               struct ptlrpc_request_set *set)
2452 {
2453         struct ptlrpc_request *req;
2454         struct obd_device     *obd = exp->exp_obd;
2455         struct obd_import     *imp = class_exp2cliimp(exp);
2456         char                  *tmp;
2457         int                    rc;
2458         ENTRY;
2459
2460         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2461
2462         if (KEY_IS(KEY_CHECKSUM)) {
2463                 if (vallen != sizeof(int))
2464                         RETURN(-EINVAL);
2465                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2466                 RETURN(0);
2467         }
2468
2469         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2470                 sptlrpc_conf_client_adapt(obd);
2471                 RETURN(0);
2472         }
2473
2474         if (KEY_IS(KEY_FLUSH_CTX)) {
2475                 sptlrpc_import_flush_my_ctx(imp);
2476                 RETURN(0);
2477         }
2478
2479         if (KEY_IS(KEY_CACHE_SET)) {
2480                 struct client_obd *cli = &obd->u.cli;
2481
2482                 LASSERT(cli->cl_cache == NULL); /* only once */
2483                 cli->cl_cache = (struct cl_client_cache *)val;
2484                 cl_cache_incref(cli->cl_cache);
2485                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2486
2487                 /* add this osc into entity list */
2488                 LASSERT(list_empty(&cli->cl_lru_osc));
2489                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2490                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2491                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2492
2493                 RETURN(0);
2494         }
2495
2496         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2497                 struct client_obd *cli = &obd->u.cli;
2498                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2499                 long target = *(long *)val;
2500
2501                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2502                 *(long *)val -= nr;
2503                 RETURN(0);
2504         }
2505
2506         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2507                 RETURN(-EINVAL);
2508
2509         /* We pass all other commands directly to OST. Since nobody calls osc
2510            methods directly and everybody is supposed to go through LOV, we
2511            assume lov checked invalid values for us.
2512            The only recognised values so far are evict_by_nid and mds_conn.
2513            Even if something bad goes through, we'd get a -EINVAL from OST
2514            anyway. */
2515
2516         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2517                                                 &RQF_OST_SET_GRANT_INFO :
2518                                                 &RQF_OBD_SET_INFO);
2519         if (req == NULL)
2520                 RETURN(-ENOMEM);
2521
2522         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2523                              RCL_CLIENT, keylen);
2524         if (!KEY_IS(KEY_GRANT_SHRINK))
2525                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2526                                      RCL_CLIENT, vallen);
2527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2528         if (rc) {
2529                 ptlrpc_request_free(req);
2530                 RETURN(rc);
2531         }
2532
2533         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2534         memcpy(tmp, key, keylen);
2535         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2536                                                         &RMF_OST_BODY :
2537                                                         &RMF_SETINFO_VAL);
2538         memcpy(tmp, val, vallen);
2539
2540         if (KEY_IS(KEY_GRANT_SHRINK)) {
2541                 struct osc_grant_args *aa;
2542                 struct obdo *oa;
2543
2544                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2545                 aa = ptlrpc_req_async_args(req);
2546                 OBDO_ALLOC(oa);
2547                 if (!oa) {
2548                         ptlrpc_req_finished(req);
2549                         RETURN(-ENOMEM);
2550                 }
2551                 *oa = ((struct ost_body *)val)->oa;
2552                 aa->aa_oa = oa;
2553                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2554         }
2555
2556         ptlrpc_request_set_replen(req);
2557         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2558                 LASSERT(set != NULL);
2559                 ptlrpc_set_add_req(set, req);
2560                 ptlrpc_check_set(NULL, set);
2561         } else
2562                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2563
2564         RETURN(0);
2565 }
2566
2567 static int osc_reconnect(const struct lu_env *env,
2568                          struct obd_export *exp, struct obd_device *obd,
2569                          struct obd_uuid *cluuid,
2570                          struct obd_connect_data *data,
2571                          void *localdata)
2572 {
2573         struct client_obd *cli = &obd->u.cli;
2574
2575         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2576                 long lost_grant;
2577
2578                 spin_lock(&cli->cl_loi_list_lock);
2579                 data->ocd_grant = (cli->cl_avail_grant +
2580                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2581                                   2 * cli_brw_size(obd);
2582                 lost_grant = cli->cl_lost_grant;
2583                 cli->cl_lost_grant = 0;
2584                 spin_unlock(&cli->cl_loi_list_lock);
2585
2586                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2587                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2588                        data->ocd_version, data->ocd_grant, lost_grant);
2589         }
2590
2591         RETURN(0);
2592 }
2593
2594 static int osc_disconnect(struct obd_export *exp)
2595 {
2596         struct obd_device *obd = class_exp2obd(exp);
2597         int rc;
2598
2599         rc = client_disconnect_export(exp);
2600         /**
2601          * Initially we put del_shrink_grant before disconnect_export, but it
2602          * causes the following problem if setup (connect) and cleanup
2603          * (disconnect) are tangled together.
2604          *      connect p1                     disconnect p2
2605          *   ptlrpc_connect_import
2606          *     ...............               class_manual_cleanup
2607          *                                     osc_disconnect
2608          *                                     del_shrink_grant
2609          *   ptlrpc_connect_interrupt
2610          *     init_grant_shrink
2611          *   add this client to shrink list
2612          *                                      cleanup_osc
2613          * Bang! pinger trigger the shrink.
2614          * So the osc should be disconnected from the shrink list, after we
2615          * are sure the import has been destroyed. BUG18662
2616          */
2617         if (obd->u.cli.cl_import == NULL)
2618                 osc_del_shrink_grant(&obd->u.cli);
2619         return rc;
2620 }
2621
2622 static int osc_import_event(struct obd_device *obd,
2623                             struct obd_import *imp,
2624                             enum obd_import_event event)
2625 {
2626         struct client_obd *cli;
2627         int rc = 0;
2628
2629         ENTRY;
2630         LASSERT(imp->imp_obd == obd);
2631
2632         switch (event) {
2633         case IMP_EVENT_DISCON: {
2634                 cli = &obd->u.cli;
2635                 spin_lock(&cli->cl_loi_list_lock);
2636                 cli->cl_avail_grant = 0;
2637                 cli->cl_lost_grant = 0;
2638                 spin_unlock(&cli->cl_loi_list_lock);
2639                 break;
2640         }
2641         case IMP_EVENT_INACTIVE: {
2642                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2643                 break;
2644         }
2645         case IMP_EVENT_INVALIDATE: {
2646                 struct ldlm_namespace *ns = obd->obd_namespace;
2647                 struct lu_env         *env;
2648                 int                    refcheck;
2649
2650                 env = cl_env_get(&refcheck);
2651                 if (!IS_ERR(env)) {
2652                         /* Reset grants */
2653                         cli = &obd->u.cli;
2654                         /* all pages go to failing rpcs due to the invalid
2655                          * import */
2656                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2657
2658                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2659                         cl_env_put(env, &refcheck);
2660                 } else
2661                         rc = PTR_ERR(env);
2662                 break;
2663         }
2664         case IMP_EVENT_ACTIVE: {
2665                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2666                 break;
2667         }
2668         case IMP_EVENT_OCD: {
2669                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2670
2671                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2672                         osc_init_grant(&obd->u.cli, ocd);
2673
2674                 /* See bug 7198 */
2675                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2676                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2677
2678                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2679                 break;
2680         }
2681         case IMP_EVENT_DEACTIVATE: {
2682                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2683                 break;
2684         }
2685         case IMP_EVENT_ACTIVATE: {
2686                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2687                 break;
2688         }
2689         default:
2690                 CERROR("Unknown import event %d\n", event);
2691                 LBUG();
2692         }
2693         RETURN(rc);
2694 }
2695
2696 /**
2697  * Determine whether the lock can be canceled before replaying the lock
2698  * during recovery, see bug16774 for detailed information.
2699  *
2700  * \retval zero the lock can't be canceled
2701  * \retval other ok to cancel
2702  */
2703 static int osc_cancel_weight(struct ldlm_lock *lock)
2704 {
2705         /*
2706          * Cancel all unused and granted extent lock.
2707          */
2708         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2709             lock->l_granted_mode == lock->l_req_mode &&
2710             osc_ldlm_weigh_ast(lock) == 0)
2711                 RETURN(1);
2712
2713         RETURN(0);
2714 }
2715
2716 static int brw_queue_work(const struct lu_env *env, void *data)
2717 {
2718         struct client_obd *cli = data;
2719
2720         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2721
2722         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2723         RETURN(0);
2724 }
2725
2726 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2727 {
2728         struct client_obd *cli = &obd->u.cli;
2729         struct obd_type   *type;
2730         void              *handler;
2731         int                rc;
2732         ENTRY;
2733
2734         rc = ptlrpcd_addref();
2735         if (rc)
2736                 RETURN(rc);
2737
2738         rc = client_obd_setup(obd, lcfg);
2739         if (rc)
2740                 GOTO(out_ptlrpcd, rc);
2741
2742         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2743         if (IS_ERR(handler))
2744                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2745         cli->cl_writeback_work = handler;
2746
2747         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2748         if (IS_ERR(handler))
2749                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2750         cli->cl_lru_work = handler;
2751
2752         rc = osc_quota_setup(obd);
2753         if (rc)
2754                 GOTO(out_ptlrpcd_work, rc);
2755
2756         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2757
2758 #ifdef CONFIG_PROC_FS
2759         obd->obd_vars = lprocfs_osc_obd_vars;
2760 #endif
2761         /* If this is true then both client (osc) and server (osp) are on the
2762          * same node. The osp layer if loaded first will register the osc proc
2763          * directory. In that case this obd_device will be attached its proc
2764          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2765         type = class_search_type(LUSTRE_OSP_NAME);
2766         if (type && type->typ_procsym) {
2767                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2768                                                        type->typ_procsym,
2769                                                        obd->obd_vars, obd);
2770                 if (IS_ERR(obd->obd_proc_entry)) {
2771                         rc = PTR_ERR(obd->obd_proc_entry);
2772                         CERROR("error %d setting up lprocfs for %s\n", rc,
2773                                obd->obd_name);
2774                         obd->obd_proc_entry = NULL;
2775                 }
2776         } else {
2777                 rc = lprocfs_obd_setup(obd);
2778         }
2779
2780         /* If the basic OSC proc tree construction succeeded then
2781          * lets do the rest. */
2782         if (rc == 0) {
2783                 lproc_osc_attach_seqstat(obd);
2784                 sptlrpc_lprocfs_cliobd_attach(obd);
2785                 ptlrpc_lprocfs_register_obd(obd);
2786         }
2787
2788         /* We need to allocate a few requests more, because
2789          * brw_interpret tries to create new requests before freeing
2790          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2791          * reserved, but I'm afraid that might be too much wasted RAM
2792          * in fact, so 2 is just my guess and still should work. */
2793         cli->cl_import->imp_rq_pool =
2794                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2795                                     OST_MAXREQSIZE,
2796                                     ptlrpc_add_rqs_to_pool);
2797
2798         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2799         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2800         RETURN(0);
2801
2802 out_ptlrpcd_work:
2803         if (cli->cl_writeback_work != NULL) {
2804                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2805                 cli->cl_writeback_work = NULL;
2806         }
2807         if (cli->cl_lru_work != NULL) {
2808                 ptlrpcd_destroy_work(cli->cl_lru_work);
2809                 cli->cl_lru_work = NULL;
2810         }
2811 out_client_setup:
2812         client_obd_cleanup(obd);
2813 out_ptlrpcd:
2814         ptlrpcd_decref();
2815         RETURN(rc);
2816 }
2817
2818 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2819 {
2820         int rc = 0;
2821         ENTRY;
2822
2823         switch (stage) {
2824         case OBD_CLEANUP_EARLY: {
2825                 struct obd_import *imp;
2826                 imp = obd->u.cli.cl_import;
2827                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2828                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2829                 ptlrpc_deactivate_import(imp);
2830                 spin_lock(&imp->imp_lock);
2831                 imp->imp_pingable = 0;
2832                 spin_unlock(&imp->imp_lock);
2833                 break;
2834         }
2835         case OBD_CLEANUP_EXPORTS: {
2836                 struct client_obd *cli = &obd->u.cli;
2837                 /* LU-464
2838                  * for echo client, export may be on zombie list, wait for
2839                  * zombie thread to cull it, because cli.cl_import will be
2840                  * cleared in client_disconnect_export():
2841                  *   class_export_destroy() -> obd_cleanup() ->
2842                  *   echo_device_free() -> echo_client_cleanup() ->
2843                  *   obd_disconnect() -> osc_disconnect() ->
2844                  *   client_disconnect_export()
2845                  */
2846                 obd_zombie_barrier();
2847                 if (cli->cl_writeback_work) {
2848                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2849                         cli->cl_writeback_work = NULL;
2850                 }
2851                 if (cli->cl_lru_work) {
2852                         ptlrpcd_destroy_work(cli->cl_lru_work);
2853                         cli->cl_lru_work = NULL;
2854                 }
2855                 obd_cleanup_client_import(obd);
2856                 ptlrpc_lprocfs_unregister_obd(obd);
2857                 lprocfs_obd_cleanup(obd);
2858                 break;
2859                 }
2860         }
2861         RETURN(rc);
2862 }
2863
2864 int osc_cleanup(struct obd_device *obd)
2865 {
2866         struct client_obd *cli = &obd->u.cli;
2867         int rc;
2868
2869         ENTRY;
2870
2871         /* lru cleanup */
2872         if (cli->cl_cache != NULL) {
2873                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2874                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2875                 list_del_init(&cli->cl_lru_osc);
2876                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2877                 cli->cl_lru_left = NULL;
2878                 cl_cache_decref(cli->cl_cache);
2879                 cli->cl_cache = NULL;
2880         }
2881
2882         /* free memory of osc quota cache */
2883         osc_quota_cleanup(obd);
2884
2885         rc = client_obd_cleanup(obd);
2886
2887         ptlrpcd_decref();
2888         RETURN(rc);
2889 }
2890
2891 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2892 {
2893         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2894         return rc > 0 ? 0: rc;
2895 }
2896
2897 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2898 {
2899         return osc_process_config_base(obd, buf);
2900 }
2901
2902 static struct obd_ops osc_obd_ops = {
2903         .o_owner                = THIS_MODULE,
2904         .o_setup                = osc_setup,
2905         .o_precleanup           = osc_precleanup,
2906         .o_cleanup              = osc_cleanup,
2907         .o_add_conn             = client_import_add_conn,
2908         .o_del_conn             = client_import_del_conn,
2909         .o_connect              = client_connect_import,
2910         .o_reconnect            = osc_reconnect,
2911         .o_disconnect           = osc_disconnect,
2912         .o_statfs               = osc_statfs,
2913         .o_statfs_async         = osc_statfs_async,
2914         .o_create               = osc_create,
2915         .o_destroy              = osc_destroy,
2916         .o_getattr              = osc_getattr,
2917         .o_setattr              = osc_setattr,
2918         .o_iocontrol            = osc_iocontrol,
2919         .o_set_info_async       = osc_set_info_async,
2920         .o_import_event         = osc_import_event,
2921         .o_process_config       = osc_process_config,
2922         .o_quotactl             = osc_quotactl,
2923         .o_quotacheck           = osc_quotacheck,
2924 };
2925
2926 static int __init osc_init(void)
2927 {
2928         bool enable_proc = true;
2929         struct obd_type *type;
2930         int rc;
2931         ENTRY;
2932
2933         /* print an address of _any_ initialized kernel symbol from this
2934          * module, to allow debugging with gdb that doesn't support data
2935          * symbols from modules.*/
2936         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2937
2938         rc = lu_kmem_init(osc_caches);
2939         if (rc)
2940                 RETURN(rc);
2941
2942         type = class_search_type(LUSTRE_OSP_NAME);
2943         if (type != NULL && type->typ_procsym != NULL)
2944                 enable_proc = false;
2945
2946         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
2947                                  LUSTRE_OSC_NAME, &osc_device_type);
2948         if (rc) {
2949                 lu_kmem_fini(osc_caches);
2950                 RETURN(rc);
2951         }
2952
2953         RETURN(rc);
2954 }
2955
2956 static void /*__exit*/ osc_exit(void)
2957 {
2958         class_unregister_type(LUSTRE_OSC_NAME);
2959         lu_kmem_fini(osc_caches);
2960 }
2961
2962 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2963 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
2964 MODULE_VERSION(LUSTRE_VERSION_STRING);
2965 MODULE_LICENSE("GPL");
2966
2967 module_init(osc_init);
2968 module_exit(osc_exit);