Whamcloud - gitweb
LU-5823 clio: add coo_obd_info_get and coo_data_version
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         obd_count                 aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_setattr_args {
72         struct obdo             *sa_oa;
73         obd_enqueue_update_f     sa_upcall;
74         void                    *sa_cookie;
75 };
76
77 struct osc_fsync_args {
78         struct obd_info *fa_oi;
79         obd_enqueue_update_f     fa_upcall;
80         void                    *fa_cookie;
81 };
82
83 struct osc_enqueue_args {
84         struct obd_export       *oa_exp;
85         ldlm_type_t             oa_type;
86         ldlm_mode_t             oa_mode;
87         __u64                   *oa_flags;
88         osc_enqueue_upcall_f    oa_upcall;
89         void                    *oa_cookie;
90         struct ost_lvb          *oa_lvb;
91         struct lustre_handle    oa_lockh;
92         unsigned int            oa_agl:1;
93 };
94
95 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
96 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
97                          void *data, int rc);
98
99 static inline void osc_pack_capa(struct ptlrpc_request *req,
100                                  struct ost_body *body, void *capa)
101 {
102         struct obd_capa *oc = (struct obd_capa *)capa;
103         struct lustre_capa *c;
104
105         if (!capa)
106                 return;
107
108         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
109         LASSERT(c);
110         capa_cpy(c, oc);
111         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
112         DEBUG_CAPA(D_SEC, c, "pack");
113 }
114
115 void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
116 {
117         struct ost_body *body;
118
119         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
120         LASSERT(body);
121
122         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
123                              oinfo->oi_oa);
124         osc_pack_capa(req, body, oinfo->oi_capa);
125 }
126
127 void osc_set_capa_size(struct ptlrpc_request *req,
128                        const struct req_msg_field *field,
129                        struct obd_capa *oc)
130 {
131         if (oc == NULL)
132                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
133         else
134                 /* it is already calculated as sizeof struct obd_capa */
135                 ;
136 }
137
138 int osc_getattr_interpret(const struct lu_env *env,
139                           struct ptlrpc_request *req,
140                           struct osc_async_args *aa, int rc)
141 {
142         struct ost_body *body;
143         ENTRY;
144
145         if (rc != 0)
146                 GOTO(out, rc);
147
148         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
149         if (body) {
150                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
151                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
152                                      aa->aa_oi->oi_oa, &body->oa);
153
154                 /* This should really be sent by the OST */
155                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
156                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
157         } else {
158                 CDEBUG(D_INFO, "can't unpack ost_body\n");
159                 rc = -EPROTO;
160                 aa->aa_oi->oi_oa->o_valid = 0;
161         }
162 out:
163         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
164         RETURN(rc);
165 }
166
167 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
168                              struct ptlrpc_request_set *set)
169 {
170         struct ptlrpc_request *req;
171         struct osc_async_args *aa;
172         int                    rc;
173         ENTRY;
174
175         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
176         if (req == NULL)
177                 RETURN(-ENOMEM);
178
179         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
180         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
181         if (rc) {
182                 ptlrpc_request_free(req);
183                 RETURN(rc);
184         }
185
186         osc_pack_req_body(req, oinfo);
187
188         ptlrpc_request_set_replen(req);
189         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
190
191         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
192         aa = ptlrpc_req_async_args(req);
193         aa->aa_oi = oinfo;
194
195         ptlrpc_set_add_req(set, req);
196         RETURN(0);
197 }
198
199 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
200                        struct obd_info *oinfo)
201 {
202         struct ptlrpc_request *req;
203         struct ost_body       *body;
204         int                    rc;
205         ENTRY;
206
207         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
208         if (req == NULL)
209                 RETURN(-ENOMEM);
210
211         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
212         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
213         if (rc) {
214                 ptlrpc_request_free(req);
215                 RETURN(rc);
216         }
217
218         osc_pack_req_body(req, oinfo);
219
220         ptlrpc_request_set_replen(req);
221
222         rc = ptlrpc_queue_wait(req);
223         if (rc)
224                 GOTO(out, rc);
225
226         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
227         if (body == NULL)
228                 GOTO(out, rc = -EPROTO);
229
230         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
231         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
232                              &body->oa);
233
234         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
235         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
236
237         EXIT;
238  out:
239         ptlrpc_req_finished(req);
240         return rc;
241 }
242
243 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
244                        struct obd_info *oinfo, struct obd_trans_info *oti)
245 {
246         struct ptlrpc_request *req;
247         struct ost_body       *body;
248         int                    rc;
249         ENTRY;
250
251         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
252
253         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
254         if (req == NULL)
255                 RETURN(-ENOMEM);
256
257         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
258         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
259         if (rc) {
260                 ptlrpc_request_free(req);
261                 RETURN(rc);
262         }
263
264         osc_pack_req_body(req, oinfo);
265
266         ptlrpc_request_set_replen(req);
267
268         rc = ptlrpc_queue_wait(req);
269         if (rc)
270                 GOTO(out, rc);
271
272         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
273         if (body == NULL)
274                 GOTO(out, rc = -EPROTO);
275
276         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
277                              &body->oa);
278
279         EXIT;
280 out:
281         ptlrpc_req_finished(req);
282         RETURN(rc);
283 }
284
285 static int osc_setattr_interpret(const struct lu_env *env,
286                                  struct ptlrpc_request *req,
287                                  struct osc_setattr_args *sa, int rc)
288 {
289         struct ost_body *body;
290         ENTRY;
291
292         if (rc != 0)
293                 GOTO(out, rc);
294
295         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
296         if (body == NULL)
297                 GOTO(out, rc = -EPROTO);
298
299         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
300                              &body->oa);
301 out:
302         rc = sa->sa_upcall(sa->sa_cookie, rc);
303         RETURN(rc);
304 }
305
306 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
307                            struct obd_trans_info *oti,
308                            obd_enqueue_update_f upcall, void *cookie,
309                            struct ptlrpc_request_set *rqset)
310 {
311         struct ptlrpc_request   *req;
312         struct osc_setattr_args *sa;
313         int                      rc;
314         ENTRY;
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
328                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
329
330         osc_pack_req_body(req, oinfo);
331
332         ptlrpc_request_set_replen(req);
333
334         /* do mds to ost setattr asynchronously */
335         if (!rqset) {
336                 /* Do not wait for response. */
337                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
338         } else {
339                 req->rq_interpret_reply =
340                         (ptlrpc_interpterer_t)osc_setattr_interpret;
341
342                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
343                 sa = ptlrpc_req_async_args(req);
344                 sa->sa_oa = oinfo->oi_oa;
345                 sa->sa_upcall = upcall;
346                 sa->sa_cookie = cookie;
347
348                 if (rqset == PTLRPCD_SET)
349                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
350                 else
351                         ptlrpc_set_add_req(rqset, req);
352         }
353
354         RETURN(0);
355 }
356
357 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
358                              struct obd_trans_info *oti,
359                              struct ptlrpc_request_set *rqset)
360 {
361         return osc_setattr_async_base(exp, oinfo, oti,
362                                       oinfo->oi_cb_up, oinfo, rqset);
363 }
364
365 static int osc_create(const struct lu_env *env, struct obd_export *exp,
366                       struct obdo *oa, struct obd_trans_info *oti)
367 {
368         struct ptlrpc_request *req;
369         struct ost_body       *body;
370         int                    rc;
371         ENTRY;
372
373         LASSERT(oa != NULL);
374         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
375         LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
378         if (req == NULL)
379                 GOTO(out, rc = -ENOMEM);
380
381         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 GOTO(out, rc);
385         }
386
387         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
388         LASSERT(body);
389
390         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
391
392         ptlrpc_request_set_replen(req);
393
394         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
395             oa->o_flags == OBD_FL_DELORPHAN) {
396                 DEBUG_REQ(D_HA, req,
397                           "delorphan from OST integration");
398                 /* Don't resend the delorphan req */
399                 req->rq_no_resend = req->rq_no_delay = 1;
400         }
401
402         rc = ptlrpc_queue_wait(req);
403         if (rc)
404                 GOTO(out_req, rc);
405
406         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
407         if (body == NULL)
408                 GOTO(out_req, rc = -EPROTO);
409
410         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
411         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
412
413         oa->o_blksize = cli_brw_size(exp->exp_obd);
414         oa->o_valid |= OBD_MD_FLBLKSZ;
415
416         if (oti != NULL) {
417                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
418                         if (oti->oti_logcookies == NULL)
419                                 oti->oti_logcookies = &oti->oti_onecookie;
420
421                         *oti->oti_logcookies = oa->o_lcookie;
422                 }
423         }
424
425         CDEBUG(D_HA, "transno: "LPD64"\n",
426                lustre_msg_get_transno(req->rq_repmsg));
427 out_req:
428         ptlrpc_req_finished(req);
429 out:
430         RETURN(rc);
431 }
432
433 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
434                    obd_enqueue_update_f upcall, void *cookie,
435                    struct ptlrpc_request_set *rqset)
436 {
437         struct ptlrpc_request   *req;
438         struct osc_setattr_args *sa;
439         struct ost_body         *body;
440         int                      rc;
441         ENTRY;
442
443         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
444         if (req == NULL)
445                 RETURN(-ENOMEM);
446
447         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
448         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
449         if (rc) {
450                 ptlrpc_request_free(req);
451                 RETURN(rc);
452         }
453         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
454         ptlrpc_at_set_req_timeout(req);
455
456         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457         LASSERT(body);
458         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
459                              oinfo->oi_oa);
460         osc_pack_capa(req, body, oinfo->oi_capa);
461
462         ptlrpc_request_set_replen(req);
463
464         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
465         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
466         sa = ptlrpc_req_async_args(req);
467         sa->sa_oa     = oinfo->oi_oa;
468         sa->sa_upcall = upcall;
469         sa->sa_cookie = cookie;
470         if (rqset == PTLRPCD_SET)
471                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
472         else
473                 ptlrpc_set_add_req(rqset, req);
474
475         RETURN(0);
476 }
477
478 static int osc_sync_interpret(const struct lu_env *env,
479                               struct ptlrpc_request *req,
480                               void *arg, int rc)
481 {
482         struct osc_fsync_args *fa = arg;
483         struct ost_body *body;
484         ENTRY;
485
486         if (rc)
487                 GOTO(out, rc);
488
489         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
490         if (body == NULL) {
491                 CERROR ("can't unpack ost_body\n");
492                 GOTO(out, rc = -EPROTO);
493         }
494
495         *fa->fa_oi->oi_oa = body->oa;
496 out:
497         rc = fa->fa_upcall(fa->fa_cookie, rc);
498         RETURN(rc);
499 }
500
501 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
502                   obd_enqueue_update_f upcall, void *cookie,
503                   struct ptlrpc_request_set *rqset)
504 {
505         struct ptlrpc_request *req;
506         struct ost_body       *body;
507         struct osc_fsync_args *fa;
508         int                    rc;
509         ENTRY;
510
511         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
512         if (req == NULL)
513                 RETURN(-ENOMEM);
514
515         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
516         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
517         if (rc) {
518                 ptlrpc_request_free(req);
519                 RETURN(rc);
520         }
521
522         /* overload the size and blocks fields in the oa with start/end */
523         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
524         LASSERT(body);
525         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
526                              oinfo->oi_oa);
527         osc_pack_capa(req, body, oinfo->oi_capa);
528
529         ptlrpc_request_set_replen(req);
530         req->rq_interpret_reply = osc_sync_interpret;
531
532         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
533         fa = ptlrpc_req_async_args(req);
534         fa->fa_oi = oinfo;
535         fa->fa_upcall = upcall;
536         fa->fa_cookie = cookie;
537
538         if (rqset == PTLRPCD_SET)
539                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
540         else
541                 ptlrpc_set_add_req(rqset, req);
542
543         RETURN (0);
544 }
545
546 /* Find and cancel locally locks matched by @mode in the resource found by
547  * @objid. Found locks are added into @cancel list. Returns the amount of
548  * locks added to @cancels list. */
549 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
550                                    struct list_head *cancels,
551                                    ldlm_mode_t mode, __u64 lock_flags)
552 {
553         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
554         struct ldlm_res_id res_id;
555         struct ldlm_resource *res;
556         int count;
557         ENTRY;
558
559         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
560          * export) but disabled through procfs (flag in NS).
561          *
562          * This distinguishes from a case when ELC is not supported originally,
563          * when we still want to cancel locks in advance and just cancel them
564          * locally, without sending any RPC. */
565         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
566                 RETURN(0);
567
568         ostid_build_res_name(&oa->o_oi, &res_id);
569         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
570         if (IS_ERR(res))
571                 RETURN(0);
572
573         LDLM_RESOURCE_ADDREF(res);
574         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
575                                            lock_flags, 0, NULL);
576         LDLM_RESOURCE_DELREF(res);
577         ldlm_resource_putref(res);
578         RETURN(count);
579 }
580
581 static int osc_destroy_interpret(const struct lu_env *env,
582                                  struct ptlrpc_request *req, void *data,
583                                  int rc)
584 {
585         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
586
587         atomic_dec(&cli->cl_destroy_in_flight);
588         wake_up(&cli->cl_destroy_waitq);
589         return 0;
590 }
591
592 static int osc_can_send_destroy(struct client_obd *cli)
593 {
594         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
595             cli->cl_max_rpcs_in_flight) {
596                 /* The destroy request can be sent */
597                 return 1;
598         }
599         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
600             cli->cl_max_rpcs_in_flight) {
601                 /*
602                  * The counter has been modified between the two atomic
603                  * operations.
604                  */
605                 wake_up(&cli->cl_destroy_waitq);
606         }
607         return 0;
608 }
609
610 /* Destroy requests can be async always on the client, and we don't even really
611  * care about the return code since the client cannot do anything at all about
612  * a destroy failure.
613  * When the MDS is unlinking a filename, it saves the file objects into a
614  * recovery llog, and these object records are cancelled when the OST reports
615  * they were destroyed and sync'd to disk (i.e. transaction committed).
616  * If the client dies, or the OST is down when the object should be destroyed,
617  * the records are not cancelled, and when the OST reconnects to the MDS next,
618  * it will retrieve the llog unlink logs and then sends the log cancellation
619  * cookies to the MDS after committing destroy transactions. */
620 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
621                        struct obdo *oa, struct obd_trans_info *oti)
622 {
623         struct client_obd     *cli = &exp->exp_obd->u.cli;
624         struct ptlrpc_request *req;
625         struct ost_body       *body;
626         struct list_head       cancels = LIST_HEAD_INIT(cancels);
627         int rc, count;
628         ENTRY;
629
630         if (!oa) {
631                 CDEBUG(D_INFO, "oa NULL\n");
632                 RETURN(-EINVAL);
633         }
634
635         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
636                                         LDLM_FL_DISCARD_DATA);
637
638         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
639         if (req == NULL) {
640                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
641                 RETURN(-ENOMEM);
642         }
643
644         osc_set_capa_size(req, &RMF_CAPA1, NULL);
645         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
646                                0, &cancels, count);
647         if (rc) {
648                 ptlrpc_request_free(req);
649                 RETURN(rc);
650         }
651
652         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
653         ptlrpc_at_set_req_timeout(req);
654
655         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
656                 oa->o_lcookie = *oti->oti_logcookies;
657         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
658         LASSERT(body);
659         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
660
661         ptlrpc_request_set_replen(req);
662
663         /* If osc_destory is for destroying the unlink orphan,
664          * sent from MDT to OST, which should not be blocked here,
665          * because the process might be triggered by ptlrpcd, and
666          * it is not good to block ptlrpcd thread (b=16006)*/
667         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
668                 req->rq_interpret_reply = osc_destroy_interpret;
669                 if (!osc_can_send_destroy(cli)) {
670                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
671                                                           NULL);
672
673                         /*
674                          * Wait until the number of on-going destroy RPCs drops
675                          * under max_rpc_in_flight
676                          */
677                         l_wait_event_exclusive(cli->cl_destroy_waitq,
678                                                osc_can_send_destroy(cli), &lwi);
679                 }
680         }
681
682         /* Do not wait for response */
683         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
684         RETURN(0);
685 }
686
687 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
688                                 long writing_bytes)
689 {
690         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
691
692         LASSERT(!(oa->o_valid & bits));
693
694         oa->o_valid |= bits;
695         spin_lock(&cli->cl_loi_list_lock);
696         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
697         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
698                      cli->cl_dirty_max_pages)) {
699                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
700                        cli->cl_dirty_pages, cli->cl_dirty_transit,
701                        cli->cl_dirty_max_pages);
702                 oa->o_undirty = 0;
703         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
704                             atomic_long_read(&obd_dirty_transit_pages) >
705                             (obd_max_dirty_pages + 1))) {
706                 /* The atomic_read() allowing the atomic_inc() are
707                  * not covered by a lock thus they may safely race and trip
708                  * this CERROR() unless we add in a small fudge factor (+1). */
709                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
710                        cli->cl_import->imp_obd->obd_name,
711                        atomic_long_read(&obd_dirty_pages),
712                        atomic_long_read(&obd_dirty_transit_pages),
713                        obd_max_dirty_pages);
714                 oa->o_undirty = 0;
715         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
716                             0x7fffffff)) {
717                 CERROR("dirty %lu - dirty_max %lu too big???\n",
718                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
719                 oa->o_undirty = 0;
720         } else {
721                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
722                                       PAGE_CACHE_SHIFT) *
723                                      (cli->cl_max_rpcs_in_flight + 1);
724                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
725                                     max_in_flight);
726         }
727         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
728         oa->o_dropped = cli->cl_lost_grant;
729         cli->cl_lost_grant = 0;
730         spin_unlock(&cli->cl_loi_list_lock);
731         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
732                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
733
734 }
735
736 void osc_update_next_shrink(struct client_obd *cli)
737 {
738         cli->cl_next_shrink_grant =
739                 cfs_time_shift(cli->cl_grant_shrink_interval);
740         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
741                cli->cl_next_shrink_grant);
742 }
743
744 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
745 {
746         spin_lock(&cli->cl_loi_list_lock);
747         cli->cl_avail_grant += grant;
748         spin_unlock(&cli->cl_loi_list_lock);
749 }
750
751 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
752 {
753         if (body->oa.o_valid & OBD_MD_FLGRANT) {
754                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
755                 __osc_update_grant(cli, body->oa.o_grant);
756         }
757 }
758
759 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
760                               obd_count keylen, void *key, obd_count vallen,
761                               void *val, struct ptlrpc_request_set *set);
762
763 static int osc_shrink_grant_interpret(const struct lu_env *env,
764                                       struct ptlrpc_request *req,
765                                       void *aa, int rc)
766 {
767         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
768         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
769         struct ost_body *body;
770
771         if (rc != 0) {
772                 __osc_update_grant(cli, oa->o_grant);
773                 GOTO(out, rc);
774         }
775
776         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
777         LASSERT(body);
778         osc_update_grant(cli, body);
779 out:
780         OBDO_FREE(oa);
781         return rc;
782 }
783
784 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
785 {
786         spin_lock(&cli->cl_loi_list_lock);
787         oa->o_grant = cli->cl_avail_grant / 4;
788         cli->cl_avail_grant -= oa->o_grant;
789         spin_unlock(&cli->cl_loi_list_lock);
790         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
791                 oa->o_valid |= OBD_MD_FLFLAGS;
792                 oa->o_flags = 0;
793         }
794         oa->o_flags |= OBD_FL_SHRINK_GRANT;
795         osc_update_next_shrink(cli);
796 }
797
798 /* Shrink the current grant, either from some large amount to enough for a
799  * full set of in-flight RPCs, or if we have already shrunk to that limit
800  * then to enough for a single RPC.  This avoids keeping more grant than
801  * needed, and avoids shrinking the grant piecemeal. */
802 static int osc_shrink_grant(struct client_obd *cli)
803 {
804         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
805                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
806
807         spin_lock(&cli->cl_loi_list_lock);
808         if (cli->cl_avail_grant <= target_bytes)
809                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
810         spin_unlock(&cli->cl_loi_list_lock);
811
812         return osc_shrink_grant_to_target(cli, target_bytes);
813 }
814
815 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
816 {
817         int                     rc = 0;
818         struct ost_body        *body;
819         ENTRY;
820
821         spin_lock(&cli->cl_loi_list_lock);
822         /* Don't shrink if we are already above or below the desired limit
823          * We don't want to shrink below a single RPC, as that will negatively
824          * impact block allocation and long-term performance. */
825         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
826                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
827
828         if (target_bytes >= cli->cl_avail_grant) {
829                 spin_unlock(&cli->cl_loi_list_lock);
830                 RETURN(0);
831         }
832         spin_unlock(&cli->cl_loi_list_lock);
833
834         OBD_ALLOC_PTR(body);
835         if (!body)
836                 RETURN(-ENOMEM);
837
838         osc_announce_cached(cli, &body->oa, 0);
839
840         spin_lock(&cli->cl_loi_list_lock);
841         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
842         cli->cl_avail_grant = target_bytes;
843         spin_unlock(&cli->cl_loi_list_lock);
844         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
845                 body->oa.o_valid |= OBD_MD_FLFLAGS;
846                 body->oa.o_flags = 0;
847         }
848         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
849         osc_update_next_shrink(cli);
850
851         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
852                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
853                                 sizeof(*body), body, NULL);
854         if (rc != 0)
855                 __osc_update_grant(cli, body->oa.o_grant);
856         OBD_FREE_PTR(body);
857         RETURN(rc);
858 }
859
860 static int osc_should_shrink_grant(struct client_obd *client)
861 {
862         cfs_time_t time = cfs_time_current();
863         cfs_time_t next_shrink = client->cl_next_shrink_grant;
864
865         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
866              OBD_CONNECT_GRANT_SHRINK) == 0)
867                 return 0;
868
869         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
870                 /* Get the current RPC size directly, instead of going via:
871                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
872                  * Keep comment here so that it can be found by searching. */
873                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
874
875                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
876                     client->cl_avail_grant > brw_size)
877                         return 1;
878                 else
879                         osc_update_next_shrink(client);
880         }
881         return 0;
882 }
883
884 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
885 {
886         struct client_obd *client;
887
888         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
889                 if (osc_should_shrink_grant(client))
890                         osc_shrink_grant(client);
891         }
892         return 0;
893 }
894
895 static int osc_add_shrink_grant(struct client_obd *client)
896 {
897         int rc;
898
899         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
900                                        TIMEOUT_GRANT,
901                                        osc_grant_shrink_grant_cb, NULL,
902                                        &client->cl_grant_shrink_list);
903         if (rc) {
904                 CERROR("add grant client %s error %d\n",
905                         client->cl_import->imp_obd->obd_name, rc);
906                 return rc;
907         }
908         CDEBUG(D_CACHE, "add grant client %s \n",
909                client->cl_import->imp_obd->obd_name);
910         osc_update_next_shrink(client);
911         return 0;
912 }
913
914 static int osc_del_shrink_grant(struct client_obd *client)
915 {
916         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
917                                          TIMEOUT_GRANT);
918 }
919
920 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
921 {
922         /*
923          * ocd_grant is the total grant amount we're expect to hold: if we've
924          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
925          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
926          * dirty.
927          *
928          * race is tolerable here: if we're evicted, but imp_state already
929          * left EVICTED state, then cl_dirty_pages must be 0 already.
930          */
931         spin_lock(&cli->cl_loi_list_lock);
932         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
933                 cli->cl_avail_grant = ocd->ocd_grant;
934         else
935                 cli->cl_avail_grant = ocd->ocd_grant -
936                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
937
938         if (cli->cl_avail_grant < 0) {
939                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
940                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
941                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
942                 /* workaround for servers which do not have the patch from
943                  * LU-2679 */
944                 cli->cl_avail_grant = ocd->ocd_grant;
945         }
946
947         /* determine the appropriate chunk size used by osc_extent. */
948         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
949         spin_unlock(&cli->cl_loi_list_lock);
950
951         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
952                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
953                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
954
955         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
956             list_empty(&cli->cl_grant_shrink_list))
957                 osc_add_shrink_grant(cli);
958 }
959
960 /* We assume that the reason this OSC got a short read is because it read
961  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
962  * via the LOV, and it _knows_ it's reading inside the file, it's just that
963  * this stripe never got written at or beyond this stripe offset yet. */
964 static void handle_short_read(int nob_read, obd_count page_count,
965                               struct brw_page **pga)
966 {
967         char *ptr;
968         int i = 0;
969
970         /* skip bytes read OK */
971         while (nob_read > 0) {
972                 LASSERT (page_count > 0);
973
974                 if (pga[i]->count > nob_read) {
975                         /* EOF inside this page */
976                         ptr = kmap(pga[i]->pg) +
977                                 (pga[i]->off & ~CFS_PAGE_MASK);
978                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
979                         kunmap(pga[i]->pg);
980                         page_count--;
981                         i++;
982                         break;
983                 }
984
985                 nob_read -= pga[i]->count;
986                 page_count--;
987                 i++;
988         }
989
990         /* zero remaining pages */
991         while (page_count-- > 0) {
992                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
993                 memset(ptr, 0, pga[i]->count);
994                 kunmap(pga[i]->pg);
995                 i++;
996         }
997 }
998
999 static int check_write_rcs(struct ptlrpc_request *req,
1000                            int requested_nob, int niocount,
1001                            obd_count page_count, struct brw_page **pga)
1002 {
1003         int     i;
1004         __u32   *remote_rcs;
1005
1006         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1007                                                   sizeof(*remote_rcs) *
1008                                                   niocount);
1009         if (remote_rcs == NULL) {
1010                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1011                 return(-EPROTO);
1012         }
1013
1014         /* return error if any niobuf was in error */
1015         for (i = 0; i < niocount; i++) {
1016                 if ((int)remote_rcs[i] < 0)
1017                         return(remote_rcs[i]);
1018
1019                 if (remote_rcs[i] != 0) {
1020                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1021                                 i, remote_rcs[i], req);
1022                         return(-EPROTO);
1023                 }
1024         }
1025
1026         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1027                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1028                        req->rq_bulk->bd_nob_transferred, requested_nob);
1029                 return(-EPROTO);
1030         }
1031
1032         return (0);
1033 }
1034
1035 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1036 {
1037         if (p1->flag != p2->flag) {
1038                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1039                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1040                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1041
1042                 /* warn if we try to combine flags that we don't know to be
1043                  * safe to combine */
1044                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1045                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1046                               "report this at https://jira.hpdd.intel.com/\n",
1047                               p1->flag, p2->flag);
1048                 }
1049                 return 0;
1050         }
1051
1052         return (p1->off + p1->count == p2->off);
1053 }
1054
1055 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1056                                    struct brw_page **pga, int opc,
1057                                    cksum_type_t cksum_type)
1058 {
1059         __u32                           cksum;
1060         int                             i = 0;
1061         struct cfs_crypto_hash_desc     *hdesc;
1062         unsigned int                    bufsize;
1063         int                             err;
1064         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1065
1066         LASSERT(pg_count > 0);
1067
1068         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1069         if (IS_ERR(hdesc)) {
1070                 CERROR("Unable to initialize checksum hash %s\n",
1071                        cfs_crypto_hash_name(cfs_alg));
1072                 return PTR_ERR(hdesc);
1073         }
1074
1075         while (nob > 0 && pg_count > 0) {
1076                 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1077
1078                 /* corrupt the data before we compute the checksum, to
1079                  * simulate an OST->client data error */
1080                 if (i == 0 && opc == OST_READ &&
1081                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1082                         unsigned char *ptr = kmap(pga[i]->pg);
1083                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1084
1085                         memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1086                         kunmap(pga[i]->pg);
1087                 }
1088                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1089                                             pga[i]->off & ~CFS_PAGE_MASK,
1090                                             count);
1091                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1092                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1093
1094                 nob -= pga[i]->count;
1095                 pg_count--;
1096                 i++;
1097         }
1098
1099         bufsize = sizeof(cksum);
1100         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1101
1102         /* For sending we only compute the wrong checksum instead
1103          * of corrupting the data so it is still correct on a redo */
1104         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1105                 cksum++;
1106
1107         return cksum;
1108 }
1109
1110 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1111                                 struct lov_stripe_md *lsm, obd_count page_count,
1112                                 struct brw_page **pga,
1113                                 struct ptlrpc_request **reqp,
1114                                 struct obd_capa *ocapa, int reserve,
1115                                 int resend)
1116 {
1117         struct ptlrpc_request   *req;
1118         struct ptlrpc_bulk_desc *desc;
1119         struct ost_body         *body;
1120         struct obd_ioobj        *ioobj;
1121         struct niobuf_remote    *niobuf;
1122         int niocount, i, requested_nob, opc, rc;
1123         struct osc_brw_async_args *aa;
1124         struct req_capsule      *pill;
1125         struct brw_page *pg_prev;
1126
1127         ENTRY;
1128         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1129                 RETURN(-ENOMEM); /* Recoverable */
1130         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1131                 RETURN(-EINVAL); /* Fatal */
1132
1133         if ((cmd & OBD_BRW_WRITE) != 0) {
1134                 opc = OST_WRITE;
1135                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1136                                                 cli->cl_import->imp_rq_pool,
1137                                                 &RQF_OST_BRW_WRITE);
1138         } else {
1139                 opc = OST_READ;
1140                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1141         }
1142         if (req == NULL)
1143                 RETURN(-ENOMEM);
1144
1145         for (niocount = i = 1; i < page_count; i++) {
1146                 if (!can_merge_pages(pga[i - 1], pga[i]))
1147                         niocount++;
1148         }
1149
1150         pill = &req->rq_pill;
1151         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1152                              sizeof(*ioobj));
1153         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1154                              niocount * sizeof(*niobuf));
1155         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1156
1157         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1158         if (rc) {
1159                 ptlrpc_request_free(req);
1160                 RETURN(rc);
1161         }
1162         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1163         ptlrpc_at_set_req_timeout(req);
1164         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1165          * retry logic */
1166         req->rq_no_retry_einprogress = 1;
1167
1168         desc = ptlrpc_prep_bulk_imp(req, page_count,
1169                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1170                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1171                 OST_BULK_PORTAL);
1172
1173         if (desc == NULL)
1174                 GOTO(out, rc = -ENOMEM);
1175         /* NB request now owns desc and will free it when it gets freed */
1176
1177         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1178         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1179         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1180         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1181
1182         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1183
1184         obdo_to_ioobj(oa, ioobj);
1185         ioobj->ioo_bufcnt = niocount;
1186         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1187          * that might be send for this request.  The actual number is decided
1188          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1189          * "max - 1" for old client compatibility sending "0", and also so the
1190          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1191         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1192         osc_pack_capa(req, body, ocapa);
1193         LASSERT(page_count > 0);
1194         pg_prev = pga[0];
1195         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1196                 struct brw_page *pg = pga[i];
1197                 int poff = pg->off & ~CFS_PAGE_MASK;
1198
1199                 LASSERT(pg->count > 0);
1200                 /* make sure there is no gap in the middle of page array */
1201                 LASSERTF(page_count == 1 ||
1202                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1203                           ergo(i > 0 && i < page_count - 1,
1204                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1205                           ergo(i == page_count - 1, poff == 0)),
1206                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1207                          i, page_count, pg, pg->off, pg->count);
1208                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1209                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1210                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1211                          i, page_count,
1212                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1213                          pg_prev->pg, page_private(pg_prev->pg),
1214                          pg_prev->pg->index, pg_prev->off);
1215                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1216                         (pg->flag & OBD_BRW_SRVLOCK));
1217
1218                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1219                 requested_nob += pg->count;
1220
1221                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1222                         niobuf--;
1223                         niobuf->rnb_len += pg->count;
1224                 } else {
1225                         niobuf->rnb_offset = pg->off;
1226                         niobuf->rnb_len    = pg->count;
1227                         niobuf->rnb_flags  = pg->flag;
1228                 }
1229                 pg_prev = pg;
1230         }
1231
1232         LASSERTF((void *)(niobuf - niocount) ==
1233                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1234                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1235                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1236
1237         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1238         if (resend) {
1239                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1240                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1241                         body->oa.o_flags = 0;
1242                 }
1243                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1244         }
1245
1246         if (osc_should_shrink_grant(cli))
1247                 osc_shrink_grant_local(cli, &body->oa);
1248
1249         /* size[REQ_REC_OFF] still sizeof (*body) */
1250         if (opc == OST_WRITE) {
1251                 if (cli->cl_checksum &&
1252                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1253                         /* store cl_cksum_type in a local variable since
1254                          * it can be changed via lprocfs */
1255                         cksum_type_t cksum_type = cli->cl_cksum_type;
1256
1257                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1258                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1259                                 body->oa.o_flags = 0;
1260                         }
1261                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1262                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1263                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1264                                                              page_count, pga,
1265                                                              OST_WRITE,
1266                                                              cksum_type);
1267                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1268                                body->oa.o_cksum);
1269                         /* save this in 'oa', too, for later checking */
1270                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1271                         oa->o_flags |= cksum_type_pack(cksum_type);
1272                 } else {
1273                         /* clear out the checksum flag, in case this is a
1274                          * resend but cl_checksum is no longer set. b=11238 */
1275                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1276                 }
1277                 oa->o_cksum = body->oa.o_cksum;
1278                 /* 1 RC per niobuf */
1279                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1280                                      sizeof(__u32) * niocount);
1281         } else {
1282                 if (cli->cl_checksum &&
1283                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1284                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1285                                 body->oa.o_flags = 0;
1286                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1287                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1288                 }
1289         }
1290         ptlrpc_request_set_replen(req);
1291
1292         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1293         aa = ptlrpc_req_async_args(req);
1294         aa->aa_oa = oa;
1295         aa->aa_requested_nob = requested_nob;
1296         aa->aa_nio_count = niocount;
1297         aa->aa_page_count = page_count;
1298         aa->aa_resends = 0;
1299         aa->aa_ppga = pga;
1300         aa->aa_cli = cli;
1301         INIT_LIST_HEAD(&aa->aa_oaps);
1302         if (ocapa && reserve)
1303                 aa->aa_ocapa = capa_get(ocapa);
1304
1305         *reqp = req;
1306         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1307         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1308                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1309                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1310         RETURN(0);
1311
1312  out:
1313         ptlrpc_req_finished(req);
1314         RETURN(rc);
1315 }
1316
1317 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1318                                 __u32 client_cksum, __u32 server_cksum, int nob,
1319                                 obd_count page_count, struct brw_page **pga,
1320                                 cksum_type_t client_cksum_type)
1321 {
1322         __u32 new_cksum;
1323         char *msg;
1324         cksum_type_t cksum_type;
1325
1326         if (server_cksum == client_cksum) {
1327                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1328                 return 0;
1329         }
1330
1331         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1332                                        oa->o_flags : 0);
1333         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1334                                       cksum_type);
1335
1336         if (cksum_type != client_cksum_type)
1337                 msg = "the server did not use the checksum type specified in "
1338                       "the original request - likely a protocol problem";
1339         else if (new_cksum == server_cksum)
1340                 msg = "changed on the client after we checksummed it - "
1341                       "likely false positive due to mmap IO (bug 11742)";
1342         else if (new_cksum == client_cksum)
1343                 msg = "changed in transit before arrival at OST";
1344         else
1345                 msg = "changed in transit AND doesn't match the original - "
1346                       "likely false positive due to mmap IO (bug 11742)";
1347
1348         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1349                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1350                            msg, libcfs_nid2str(peer->nid),
1351                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1352                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1353                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1354                            POSTID(&oa->o_oi), pga[0]->off,
1355                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1356         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1357                "client csum now %x\n", client_cksum, client_cksum_type,
1358                server_cksum, cksum_type, new_cksum);
1359         return 1;
1360 }
1361
1362 /* Note rc enters this function as number of bytes transferred */
1363 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1364 {
1365         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1366         const lnet_process_id_t *peer =
1367                         &req->rq_import->imp_connection->c_peer;
1368         struct client_obd *cli = aa->aa_cli;
1369         struct ost_body *body;
1370         u32 client_cksum = 0;
1371         ENTRY;
1372
1373         if (rc < 0 && rc != -EDQUOT) {
1374                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1375                 RETURN(rc);
1376         }
1377
1378         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1379         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1380         if (body == NULL) {
1381                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1382                 RETURN(-EPROTO);
1383         }
1384
1385         /* set/clear over quota flag for a uid/gid */
1386         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1387             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1388                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1389
1390                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1391                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1392                        body->oa.o_flags);
1393                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1394         }
1395
1396         osc_update_grant(cli, body);
1397
1398         if (rc < 0)
1399                 RETURN(rc);
1400
1401         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1402                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1403
1404         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1405                 if (rc > 0) {
1406                         CERROR("Unexpected +ve rc %d\n", rc);
1407                         RETURN(-EPROTO);
1408                 }
1409                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1410
1411                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1412                         RETURN(-EAGAIN);
1413
1414                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1415                     check_write_checksum(&body->oa, peer, client_cksum,
1416                                          body->oa.o_cksum, aa->aa_requested_nob,
1417                                          aa->aa_page_count, aa->aa_ppga,
1418                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1419                         RETURN(-EAGAIN);
1420
1421                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1422                                      aa->aa_page_count, aa->aa_ppga);
1423                 GOTO(out, rc);
1424         }
1425
1426         /* The rest of this function executes only for OST_READs */
1427
1428         /* if unwrap_bulk failed, return -EAGAIN to retry */
1429         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1430         if (rc < 0)
1431                 GOTO(out, rc = -EAGAIN);
1432
1433         if (rc > aa->aa_requested_nob) {
1434                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1435                        aa->aa_requested_nob);
1436                 RETURN(-EPROTO);
1437         }
1438
1439         if (rc != req->rq_bulk->bd_nob_transferred) {
1440                 CERROR ("Unexpected rc %d (%d transferred)\n",
1441                         rc, req->rq_bulk->bd_nob_transferred);
1442                 return (-EPROTO);
1443         }
1444
1445         if (rc < aa->aa_requested_nob)
1446                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1447
1448         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1449                 static int cksum_counter;
1450                 u32        server_cksum = body->oa.o_cksum;
1451                 char      *via = "";
1452                 char      *router = "";
1453                 cksum_type_t cksum_type;
1454
1455                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1456                                                body->oa.o_flags : 0);
1457                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1458                                                  aa->aa_ppga, OST_READ,
1459                                                  cksum_type);
1460
1461                 if (peer->nid != req->rq_bulk->bd_sender) {
1462                         via = " via ";
1463                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1464                 }
1465
1466                 if (server_cksum != client_cksum) {
1467                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1468                                            "%s%s%s inode "DFID" object "DOSTID
1469                                            " extent ["LPU64"-"LPU64"]\n",
1470                                            req->rq_import->imp_obd->obd_name,
1471                                            libcfs_nid2str(peer->nid),
1472                                            via, router,
1473                                            body->oa.o_valid & OBD_MD_FLFID ?
1474                                                 body->oa.o_parent_seq : (__u64)0,
1475                                            body->oa.o_valid & OBD_MD_FLFID ?
1476                                                 body->oa.o_parent_oid : 0,
1477                                            body->oa.o_valid & OBD_MD_FLFID ?
1478                                                 body->oa.o_parent_ver : 0,
1479                                            POSTID(&body->oa.o_oi),
1480                                            aa->aa_ppga[0]->off,
1481                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1482                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1483                                                                         1);
1484                         CERROR("client %x, server %x, cksum_type %x\n",
1485                                client_cksum, server_cksum, cksum_type);
1486                         cksum_counter = 0;
1487                         aa->aa_oa->o_cksum = client_cksum;
1488                         rc = -EAGAIN;
1489                 } else {
1490                         cksum_counter++;
1491                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1492                         rc = 0;
1493                 }
1494         } else if (unlikely(client_cksum)) {
1495                 static int cksum_missed;
1496
1497                 cksum_missed++;
1498                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1499                         CERROR("Checksum %u requested from %s but not sent\n",
1500                                cksum_missed, libcfs_nid2str(peer->nid));
1501         } else {
1502                 rc = 0;
1503         }
1504 out:
1505         if (rc >= 0)
1506                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1507                                      aa->aa_oa, &body->oa);
1508
1509         RETURN(rc);
1510 }
1511
1512 static int osc_brw_redo_request(struct ptlrpc_request *request,
1513                                 struct osc_brw_async_args *aa, int rc)
1514 {
1515         struct ptlrpc_request *new_req;
1516         struct osc_brw_async_args *new_aa;
1517         struct osc_async_page *oap;
1518         ENTRY;
1519
1520         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1521                   "redo for recoverable error %d", rc);
1522
1523         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1524                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1525                                   aa->aa_cli, aa->aa_oa,
1526                                   NULL /* lsm unused by osc currently */,
1527                                   aa->aa_page_count, aa->aa_ppga,
1528                                   &new_req, aa->aa_ocapa, 0, 1);
1529         if (rc)
1530                 RETURN(rc);
1531
1532         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1533                 if (oap->oap_request != NULL) {
1534                         LASSERTF(request == oap->oap_request,
1535                                  "request %p != oap_request %p\n",
1536                                  request, oap->oap_request);
1537                         if (oap->oap_interrupted) {
1538                                 ptlrpc_req_finished(new_req);
1539                                 RETURN(-EINTR);
1540                         }
1541                 }
1542         }
1543         /* New request takes over pga and oaps from old request.
1544          * Note that copying a list_head doesn't work, need to move it... */
1545         aa->aa_resends++;
1546         new_req->rq_interpret_reply = request->rq_interpret_reply;
1547         new_req->rq_async_args = request->rq_async_args;
1548         new_req->rq_commit_cb = request->rq_commit_cb;
1549         /* cap resend delay to the current request timeout, this is similar to
1550          * what ptlrpc does (see after_reply()) */
1551         if (aa->aa_resends > new_req->rq_timeout)
1552                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1553         else
1554                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1555         new_req->rq_generation_set = 1;
1556         new_req->rq_import_generation = request->rq_import_generation;
1557
1558         new_aa = ptlrpc_req_async_args(new_req);
1559
1560         INIT_LIST_HEAD(&new_aa->aa_oaps);
1561         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1562         INIT_LIST_HEAD(&new_aa->aa_exts);
1563         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1564         new_aa->aa_resends = aa->aa_resends;
1565
1566         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1567                 if (oap->oap_request) {
1568                         ptlrpc_req_finished(oap->oap_request);
1569                         oap->oap_request = ptlrpc_request_addref(new_req);
1570                 }
1571         }
1572
1573         new_aa->aa_ocapa = aa->aa_ocapa;
1574         aa->aa_ocapa = NULL;
1575
1576         /* XXX: This code will run into problem if we're going to support
1577          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1578          * and wait for all of them to be finished. We should inherit request
1579          * set from old request. */
1580         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1581
1582         DEBUG_REQ(D_INFO, new_req, "new request");
1583         RETURN(0);
1584 }
1585
1586 /*
1587  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1588  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1589  * fine for our small page arrays and doesn't require allocation.  its an
1590  * insertion sort that swaps elements that are strides apart, shrinking the
1591  * stride down until its '1' and the array is sorted.
1592  */
1593 static void sort_brw_pages(struct brw_page **array, int num)
1594 {
1595         int stride, i, j;
1596         struct brw_page *tmp;
1597
1598         if (num == 1)
1599                 return;
1600         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1601                 ;
1602
1603         do {
1604                 stride /= 3;
1605                 for (i = stride ; i < num ; i++) {
1606                         tmp = array[i];
1607                         j = i;
1608                         while (j >= stride && array[j - stride]->off > tmp->off) {
1609                                 array[j] = array[j - stride];
1610                                 j -= stride;
1611                         }
1612                         array[j] = tmp;
1613                 }
1614         } while (stride > 1);
1615 }
1616
1617 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1618 {
1619         LASSERT(ppga != NULL);
1620         OBD_FREE(ppga, sizeof(*ppga) * count);
1621 }
1622
1623 static int brw_interpret(const struct lu_env *env,
1624                          struct ptlrpc_request *req, void *data, int rc)
1625 {
1626         struct osc_brw_async_args *aa = data;
1627         struct osc_extent *ext;
1628         struct osc_extent *tmp;
1629         struct client_obd *cli = aa->aa_cli;
1630         ENTRY;
1631
1632         rc = osc_brw_fini_request(req, rc);
1633         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1634         /* When server return -EINPROGRESS, client should always retry
1635          * regardless of the number of times the bulk was resent already. */
1636         if (osc_recoverable_error(rc)) {
1637                 if (req->rq_import_generation !=
1638                     req->rq_import->imp_generation) {
1639                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1640                                ""DOSTID", rc = %d.\n",
1641                                req->rq_import->imp_obd->obd_name,
1642                                POSTID(&aa->aa_oa->o_oi), rc);
1643                 } else if (rc == -EINPROGRESS ||
1644                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1645                         rc = osc_brw_redo_request(req, aa, rc);
1646                 } else {
1647                         CERROR("%s: too many resent retries for object: "
1648                                ""LPU64":"LPU64", rc = %d.\n",
1649                                req->rq_import->imp_obd->obd_name,
1650                                POSTID(&aa->aa_oa->o_oi), rc);
1651                 }
1652
1653                 if (rc == 0)
1654                         RETURN(0);
1655                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1656                         rc = -EIO;
1657         }
1658
1659         if (aa->aa_ocapa) {
1660                 capa_put(aa->aa_ocapa);
1661                 aa->aa_ocapa = NULL;
1662         }
1663
1664         if (rc == 0) {
1665                 struct obdo *oa = aa->aa_oa;
1666                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1667                 unsigned long valid = 0;
1668                 struct cl_object *obj;
1669                 struct osc_async_page *last;
1670
1671                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1672                 obj = osc2cl(last->oap_obj);
1673
1674                 cl_object_attr_lock(obj);
1675                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1676                         attr->cat_blocks = oa->o_blocks;
1677                         valid |= CAT_BLOCKS;
1678                 }
1679                 if (oa->o_valid & OBD_MD_FLMTIME) {
1680                         attr->cat_mtime = oa->o_mtime;
1681                         valid |= CAT_MTIME;
1682                 }
1683                 if (oa->o_valid & OBD_MD_FLATIME) {
1684                         attr->cat_atime = oa->o_atime;
1685                         valid |= CAT_ATIME;
1686                 }
1687                 if (oa->o_valid & OBD_MD_FLCTIME) {
1688                         attr->cat_ctime = oa->o_ctime;
1689                         valid |= CAT_CTIME;
1690                 }
1691
1692                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1693                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1694                         loff_t last_off = last->oap_count + last->oap_obj_off +
1695                                 last->oap_page_off;
1696
1697                         /* Change file size if this is an out of quota or
1698                          * direct IO write and it extends the file size */
1699                         if (loi->loi_lvb.lvb_size < last_off) {
1700                                 attr->cat_size = last_off;
1701                                 valid |= CAT_SIZE;
1702                         }
1703                         /* Extend KMS if it's not a lockless write */
1704                         if (loi->loi_kms < last_off &&
1705                             oap2osc_page(last)->ops_srvlock == 0) {
1706                                 attr->cat_kms = last_off;
1707                                 valid |= CAT_KMS;
1708                         }
1709                 }
1710
1711                 if (valid != 0)
1712                         cl_object_attr_update(env, obj, attr, valid);
1713                 cl_object_attr_unlock(obj);
1714         }
1715         OBDO_FREE(aa->aa_oa);
1716
1717         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1718                 osc_inc_unstable_pages(req);
1719
1720         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1721                 list_del_init(&ext->oe_link);
1722                 osc_extent_finish(env, ext, 1, rc);
1723         }
1724         LASSERT(list_empty(&aa->aa_exts));
1725         LASSERT(list_empty(&aa->aa_oaps));
1726
1727         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1728                           req->rq_bulk->bd_nob_transferred);
1729         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1730         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1731
1732         spin_lock(&cli->cl_loi_list_lock);
1733         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1734          * is called so we know whether to go to sync BRWs or wait for more
1735          * RPCs to complete */
1736         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1737                 cli->cl_w_in_flight--;
1738         else
1739                 cli->cl_r_in_flight--;
1740         osc_wake_cache_waiters(cli);
1741         spin_unlock(&cli->cl_loi_list_lock);
1742
1743         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1744         RETURN(rc);
1745 }
1746
1747 static void brw_commit(struct ptlrpc_request *req)
1748 {
1749         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1750          * this called via the rq_commit_cb, I need to ensure
1751          * osc_dec_unstable_pages is still called. Otherwise unstable
1752          * pages may be leaked. */
1753         spin_lock(&req->rq_lock);
1754         if (likely(req->rq_unstable)) {
1755                 req->rq_unstable = 0;
1756                 spin_unlock(&req->rq_lock);
1757
1758                 osc_dec_unstable_pages(req);
1759         } else {
1760                 req->rq_committed = 1;
1761                 spin_unlock(&req->rq_lock);
1762         }
1763 }
1764
1765 /**
1766  * Build an RPC by the list of extent @ext_list. The caller must ensure
1767  * that the total pages in this list are NOT over max pages per RPC.
1768  * Extents in the list must be in OES_RPC state.
1769  */
1770 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1771                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1772 {
1773         struct ptlrpc_request           *req = NULL;
1774         struct osc_extent               *ext;
1775         struct brw_page                 **pga = NULL;
1776         struct osc_brw_async_args       *aa = NULL;
1777         struct obdo                     *oa = NULL;
1778         struct osc_async_page           *oap;
1779         struct osc_async_page           *tmp;
1780         struct cl_req                   *clerq = NULL;
1781         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1782                                                                       CRT_READ;
1783         struct cl_req_attr              *crattr = NULL;
1784         obd_off                         starting_offset = OBD_OBJECT_EOF;
1785         obd_off                         ending_offset = 0;
1786         int                             mpflag = 0;
1787         int                             mem_tight = 0;
1788         int                             page_count = 0;
1789         bool                            soft_sync = false;
1790         int                             i;
1791         int                             rc;
1792         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1793         struct ost_body                 *body;
1794         ENTRY;
1795         LASSERT(!list_empty(ext_list));
1796
1797         /* add pages into rpc_list to build BRW rpc */
1798         list_for_each_entry(ext, ext_list, oe_link) {
1799                 LASSERT(ext->oe_state == OES_RPC);
1800                 mem_tight |= ext->oe_memalloc;
1801                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1802                         ++page_count;
1803                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1804                         if (starting_offset > oap->oap_obj_off)
1805                                 starting_offset = oap->oap_obj_off;
1806                         else
1807                                 LASSERT(oap->oap_page_off == 0);
1808                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1809                                 ending_offset = oap->oap_obj_off +
1810                                                 oap->oap_count;
1811                         else
1812                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1813                                         PAGE_CACHE_SIZE);
1814                 }
1815         }
1816
1817         soft_sync = osc_over_unstable_soft_limit(cli);
1818         if (mem_tight)
1819                 mpflag = cfs_memory_pressure_get_and_set();
1820
1821         OBD_ALLOC(crattr, sizeof(*crattr));
1822         if (crattr == NULL)
1823                 GOTO(out, rc = -ENOMEM);
1824
1825         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1826         if (pga == NULL)
1827                 GOTO(out, rc = -ENOMEM);
1828
1829         OBDO_ALLOC(oa);
1830         if (oa == NULL)
1831                 GOTO(out, rc = -ENOMEM);
1832
1833         i = 0;
1834         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1835                 struct cl_page *page = oap2cl_page(oap);
1836                 if (clerq == NULL) {
1837                         clerq = cl_req_alloc(env, page, crt,
1838                                              1 /* only 1-object rpcs for now */);
1839                         if (IS_ERR(clerq))
1840                                 GOTO(out, rc = PTR_ERR(clerq));
1841                 }
1842                 if (mem_tight)
1843                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1844                 if (soft_sync)
1845                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1846                 pga[i] = &oap->oap_brw_page;
1847                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1848                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1849                        pga[i]->pg, page_index(oap->oap_page), oap,
1850                        pga[i]->flag);
1851                 i++;
1852                 cl_req_page_add(env, clerq, page);
1853         }
1854
1855         /* always get the data for the obdo for the rpc */
1856         LASSERT(clerq != NULL);
1857         crattr->cra_oa = oa;
1858         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1859
1860         rc = cl_req_prep(env, clerq);
1861         if (rc != 0) {
1862                 CERROR("cl_req_prep failed: %d\n", rc);
1863                 GOTO(out, rc);
1864         }
1865
1866         sort_brw_pages(pga, page_count);
1867         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1868                         pga, &req, crattr->cra_capa, 1, 0);
1869         if (rc != 0) {
1870                 CERROR("prep_req failed: %d\n", rc);
1871                 GOTO(out, rc);
1872         }
1873
1874         req->rq_commit_cb = brw_commit;
1875         req->rq_interpret_reply = brw_interpret;
1876
1877         if (mem_tight != 0)
1878                 req->rq_memalloc = 1;
1879
1880         /* Need to update the timestamps after the request is built in case
1881          * we race with setattr (locally or in queue at OST).  If OST gets
1882          * later setattr before earlier BRW (as determined by the request xid),
1883          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1884          * way to do this in a single call.  bug 10150 */
1885         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1886         crattr->cra_oa = &body->oa;
1887         cl_req_attr_set(env, clerq, crattr,
1888                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1889
1890         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1891
1892         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1893         aa = ptlrpc_req_async_args(req);
1894         INIT_LIST_HEAD(&aa->aa_oaps);
1895         list_splice_init(&rpc_list, &aa->aa_oaps);
1896         INIT_LIST_HEAD(&aa->aa_exts);
1897         list_splice_init(ext_list, &aa->aa_exts);
1898         aa->aa_clerq = clerq;
1899
1900         /* queued sync pages can be torn down while the pages
1901          * were between the pending list and the rpc */
1902         tmp = NULL;
1903         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1904                 /* only one oap gets a request reference */
1905                 if (tmp == NULL)
1906                         tmp = oap;
1907                 if (oap->oap_interrupted && !req->rq_intr) {
1908                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1909                                         oap, req);
1910                         ptlrpc_mark_interrupted(req);
1911                 }
1912         }
1913         if (tmp != NULL)
1914                 tmp->oap_request = ptlrpc_request_addref(req);
1915
1916         spin_lock(&cli->cl_loi_list_lock);
1917         starting_offset >>= PAGE_CACHE_SHIFT;
1918         if (cmd == OBD_BRW_READ) {
1919                 cli->cl_r_in_flight++;
1920                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1921                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1922                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1923                                       starting_offset + 1);
1924         } else {
1925                 cli->cl_w_in_flight++;
1926                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1927                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1928                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1929                                       starting_offset + 1);
1930         }
1931         spin_unlock(&cli->cl_loi_list_lock);
1932
1933         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
1934                   page_count, aa, cli->cl_r_in_flight,
1935                   cli->cl_w_in_flight);
1936
1937         /* XXX: Maybe the caller can check the RPC bulk descriptor to
1938          * see which CPU/NUMA node the majority of pages were allocated
1939          * on, and try to assign the async RPC to the CPU core
1940          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
1941          *
1942          * But on the other hand, we expect that multiple ptlrpcd
1943          * threads and the initial write sponsor can run in parallel,
1944          * especially when data checksum is enabled, which is CPU-bound
1945          * operation and single ptlrpcd thread cannot process in time.
1946          * So more ptlrpcd threads sharing BRW load
1947          * (with PDL_POLICY_ROUND) seems better.
1948          */
1949         ptlrpcd_add_req(req, pol, -1);
1950         rc = 0;
1951         EXIT;
1952
1953 out:
1954         if (mem_tight != 0)
1955                 cfs_memory_pressure_restore(mpflag);
1956
1957         if (crattr != NULL) {
1958                 capa_put(crattr->cra_capa);
1959                 OBD_FREE(crattr, sizeof(*crattr));
1960         }
1961
1962         if (rc != 0) {
1963                 LASSERT(req == NULL);
1964
1965                 if (oa)
1966                         OBDO_FREE(oa);
1967                 if (pga)
1968                         OBD_FREE(pga, sizeof(*pga) * page_count);
1969                 /* this should happen rarely and is pretty bad, it makes the
1970                  * pending list not follow the dirty order */
1971                 while (!list_empty(ext_list)) {
1972                         ext = list_entry(ext_list->next, struct osc_extent,
1973                                          oe_link);
1974                         list_del_init(&ext->oe_link);
1975                         osc_extent_finish(env, ext, 0, rc);
1976                 }
1977                 if (clerq && !IS_ERR(clerq))
1978                         cl_req_completion(env, clerq, rc);
1979         }
1980         RETURN(rc);
1981 }
1982
1983 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
1984                                         struct ldlm_enqueue_info *einfo)
1985 {
1986         void *data = einfo->ei_cbdata;
1987         int set = 0;
1988
1989         LASSERT(lock != NULL);
1990         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
1991         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
1992         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
1993         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
1994
1995         lock_res_and_lock(lock);
1996
1997         if (lock->l_ast_data == NULL)
1998                 lock->l_ast_data = data;
1999         if (lock->l_ast_data == data)
2000                 set = 1;
2001
2002         unlock_res_and_lock(lock);
2003
2004         return set;
2005 }
2006
2007 static int osc_set_data_with_check(struct lustre_handle *lockh,
2008                                    struct ldlm_enqueue_info *einfo)
2009 {
2010         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2011         int set = 0;
2012
2013         if (lock != NULL) {
2014                 set = osc_set_lock_data_with_check(lock, einfo);
2015                 LDLM_LOCK_PUT(lock);
2016         } else
2017                 CERROR("lockh %p, data %p - client evicted?\n",
2018                        lockh, einfo->ei_cbdata);
2019         return set;
2020 }
2021
2022 static int osc_enqueue_fini(struct ptlrpc_request *req,
2023                             osc_enqueue_upcall_f upcall, void *cookie,
2024                             struct lustre_handle *lockh, ldlm_mode_t mode,
2025                             __u64 *flags, int agl, int errcode)
2026 {
2027         bool intent = *flags & LDLM_FL_HAS_INTENT;
2028         int rc;
2029         ENTRY;
2030
2031         /* The request was created before ldlm_cli_enqueue call. */
2032         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2033                 struct ldlm_reply *rep;
2034
2035                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2036                 LASSERT(rep != NULL);
2037
2038                 rep->lock_policy_res1 =
2039                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2040                 if (rep->lock_policy_res1)
2041                         errcode = rep->lock_policy_res1;
2042                 if (!agl)
2043                         *flags |= LDLM_FL_LVB_READY;
2044         } else if (errcode == ELDLM_OK) {
2045                 *flags |= LDLM_FL_LVB_READY;
2046         }
2047
2048         /* Call the update callback. */
2049         rc = (*upcall)(cookie, lockh, errcode);
2050
2051         /* release the reference taken in ldlm_cli_enqueue() */
2052         if (errcode == ELDLM_LOCK_MATCHED)
2053                 errcode = ELDLM_OK;
2054         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2055                 ldlm_lock_decref(lockh, mode);
2056
2057         RETURN(rc);
2058 }
2059
2060 static int osc_enqueue_interpret(const struct lu_env *env,
2061                                  struct ptlrpc_request *req,
2062                                  struct osc_enqueue_args *aa, int rc)
2063 {
2064         struct ldlm_lock *lock;
2065         struct lustre_handle *lockh = &aa->oa_lockh;
2066         ldlm_mode_t mode = aa->oa_mode;
2067         struct ost_lvb *lvb = aa->oa_lvb;
2068         __u32 lvb_len = sizeof(*lvb);
2069         __u64 flags = 0;
2070
2071         ENTRY;
2072
2073         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2074          * be valid. */
2075         lock = ldlm_handle2lock(lockh);
2076         LASSERTF(lock != NULL,
2077                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2078                  lockh->cookie, req, aa);
2079
2080         /* Take an additional reference so that a blocking AST that
2081          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2082          * to arrive after an upcall has been executed by
2083          * osc_enqueue_fini(). */
2084         ldlm_lock_addref(lockh, mode);
2085
2086         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2087         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2088
2089         /* Let CP AST to grant the lock first. */
2090         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2091
2092         if (aa->oa_agl) {
2093                 LASSERT(aa->oa_lvb == NULL);
2094                 LASSERT(aa->oa_flags == NULL);
2095                 aa->oa_flags = &flags;
2096         }
2097
2098         /* Complete obtaining the lock procedure. */
2099         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2100                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2101                                    lockh, rc);
2102         /* Complete osc stuff. */
2103         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2104                               aa->oa_flags, aa->oa_agl, rc);
2105
2106         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2107
2108         ldlm_lock_decref(lockh, mode);
2109         LDLM_LOCK_PUT(lock);
2110         RETURN(rc);
2111 }
2112
2113 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2114
2115 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2116  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2117  * other synchronous requests, however keeping some locks and trying to obtain
2118  * others may take a considerable amount of time in a case of ost failure; and
2119  * when other sync requests do not get released lock from a client, the client
2120  * is evicted from the cluster -- such scenarious make the life difficult, so
2121  * release locks just after they are obtained. */
2122 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2123                      __u64 *flags, ldlm_policy_data_t *policy,
2124                      struct ost_lvb *lvb, int kms_valid,
2125                      osc_enqueue_upcall_f upcall, void *cookie,
2126                      struct ldlm_enqueue_info *einfo,
2127                      struct ptlrpc_request_set *rqset, int async, int agl)
2128 {
2129         struct obd_device *obd = exp->exp_obd;
2130         struct lustre_handle lockh = { 0 };
2131         struct ptlrpc_request *req = NULL;
2132         int intent = *flags & LDLM_FL_HAS_INTENT;
2133         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2134         ldlm_mode_t mode;
2135         int rc;
2136         ENTRY;
2137
2138         /* Filesystem lock extents are extended to page boundaries so that
2139          * dealing with the page cache is a little smoother.  */
2140         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2141         policy->l_extent.end |= ~CFS_PAGE_MASK;
2142
2143         /*
2144          * kms is not valid when either object is completely fresh (so that no
2145          * locks are cached), or object was evicted. In the latter case cached
2146          * lock cannot be used, because it would prime inode state with
2147          * potentially stale LVB.
2148          */
2149         if (!kms_valid)
2150                 goto no_match;
2151
2152         /* Next, search for already existing extent locks that will cover us */
2153         /* If we're trying to read, we also search for an existing PW lock.  The
2154          * VFS and page cache already protect us locally, so lots of readers/
2155          * writers can share a single PW lock.
2156          *
2157          * There are problems with conversion deadlocks, so instead of
2158          * converting a read lock to a write lock, we'll just enqueue a new
2159          * one.
2160          *
2161          * At some point we should cancel the read lock instead of making them
2162          * send us a blocking callback, but there are problems with canceling
2163          * locks out from other users right now, too. */
2164         mode = einfo->ei_mode;
2165         if (einfo->ei_mode == LCK_PR)
2166                 mode |= LCK_PW;
2167         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2168                                einfo->ei_type, policy, mode, &lockh, 0);
2169         if (mode) {
2170                 struct ldlm_lock *matched;
2171
2172                 if (*flags & LDLM_FL_TEST_LOCK)
2173                         RETURN(ELDLM_OK);
2174
2175                 matched = ldlm_handle2lock(&lockh);
2176                 if (agl) {
2177                         /* AGL enqueues DLM locks speculatively. Therefore if
2178                          * it already exists a DLM lock, it wll just inform the
2179                          * caller to cancel the AGL process for this stripe. */
2180                         ldlm_lock_decref(&lockh, mode);
2181                         LDLM_LOCK_PUT(matched);
2182                         RETURN(-ECANCELED);
2183                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2184                         *flags |= LDLM_FL_LVB_READY;
2185
2186                         /* We already have a lock, and it's referenced. */
2187                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2188
2189                         ldlm_lock_decref(&lockh, mode);
2190                         LDLM_LOCK_PUT(matched);
2191                         RETURN(ELDLM_OK);
2192                 } else {
2193                         ldlm_lock_decref(&lockh, mode);
2194                         LDLM_LOCK_PUT(matched);
2195                 }
2196         }
2197
2198 no_match:
2199         if (*flags & LDLM_FL_TEST_LOCK)
2200                 RETURN(-ENOLCK);
2201
2202         if (intent) {
2203                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2204                                            &RQF_LDLM_ENQUEUE_LVB);
2205                 if (req == NULL)
2206                         RETURN(-ENOMEM);
2207
2208                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2209                 if (rc < 0) {
2210                         ptlrpc_request_free(req);
2211                         RETURN(rc);
2212                 }
2213
2214                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2215                                      sizeof *lvb);
2216                 ptlrpc_request_set_replen(req);
2217         }
2218
2219         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2220         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2221
2222         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2223                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2224         if (async) {
2225                 if (!rc) {
2226                         struct osc_enqueue_args *aa;
2227                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2228                         aa = ptlrpc_req_async_args(req);
2229                         aa->oa_exp    = exp;
2230                         aa->oa_mode   = einfo->ei_mode;
2231                         aa->oa_type   = einfo->ei_type;
2232                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2233                         aa->oa_upcall = upcall;
2234                         aa->oa_cookie = cookie;
2235                         aa->oa_agl    = !!agl;
2236                         if (!agl) {
2237                                 aa->oa_flags  = flags;
2238                                 aa->oa_lvb    = lvb;
2239                         } else {
2240                                 /* AGL is essentially to enqueue an DLM lock
2241                                  * in advance, so we don't care about the
2242                                  * result of AGL enqueue. */
2243                                 aa->oa_lvb    = NULL;
2244                                 aa->oa_flags  = NULL;
2245                         }
2246
2247                         req->rq_interpret_reply =
2248                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2249                         if (rqset == PTLRPCD_SET)
2250                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2251                         else
2252                                 ptlrpc_set_add_req(rqset, req);
2253                 } else if (intent) {
2254                         ptlrpc_req_finished(req);
2255                 }
2256                 RETURN(rc);
2257         }
2258
2259         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2260                               flags, agl, rc);
2261         if (intent)
2262                 ptlrpc_req_finished(req);
2263
2264         RETURN(rc);
2265 }
2266
2267 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2268                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2269                    __u64 *flags, void *data, struct lustre_handle *lockh,
2270                    int unref)
2271 {
2272         struct obd_device *obd = exp->exp_obd;
2273         __u64 lflags = *flags;
2274         ldlm_mode_t rc;
2275         ENTRY;
2276
2277         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2278                 RETURN(-EIO);
2279
2280         /* Filesystem lock extents are extended to page boundaries so that
2281          * dealing with the page cache is a little smoother */
2282         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2283         policy->l_extent.end |= ~CFS_PAGE_MASK;
2284
2285         /* Next, search for already existing extent locks that will cover us */
2286         /* If we're trying to read, we also search for an existing PW lock.  The
2287          * VFS and page cache already protect us locally, so lots of readers/
2288          * writers can share a single PW lock. */
2289         rc = mode;
2290         if (mode == LCK_PR)
2291                 rc |= LCK_PW;
2292         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2293                              res_id, type, policy, rc, lockh, unref);
2294         if (rc) {
2295                 if (data != NULL) {
2296                         if (!osc_set_data_with_check(lockh, data)) {
2297                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2298                                         ldlm_lock_decref(lockh, rc);
2299                                 RETURN(0);
2300                         }
2301                 }
2302                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2303                         ldlm_lock_addref(lockh, LCK_PR);
2304                         ldlm_lock_decref(lockh, LCK_PW);
2305                 }
2306                 RETURN(rc);
2307         }
2308         RETURN(rc);
2309 }
2310
2311 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2312 {
2313         ENTRY;
2314
2315         if (unlikely(mode == LCK_GROUP))
2316                 ldlm_lock_decref_and_cancel(lockh, mode);
2317         else
2318                 ldlm_lock_decref(lockh, mode);
2319
2320         RETURN(0);
2321 }
2322
2323 static int osc_statfs_interpret(const struct lu_env *env,
2324                                 struct ptlrpc_request *req,
2325                                 struct osc_async_args *aa, int rc)
2326 {
2327         struct obd_statfs *msfs;
2328         ENTRY;
2329
2330         if (rc == -EBADR)
2331                 /* The request has in fact never been sent
2332                  * due to issues at a higher level (LOV).
2333                  * Exit immediately since the caller is
2334                  * aware of the problem and takes care
2335                  * of the clean up */
2336                  RETURN(rc);
2337
2338         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2339             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2340                 GOTO(out, rc = 0);
2341
2342         if (rc != 0)
2343                 GOTO(out, rc);
2344
2345         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2346         if (msfs == NULL) {
2347                 GOTO(out, rc = -EPROTO);
2348         }
2349
2350         *aa->aa_oi->oi_osfs = *msfs;
2351 out:
2352         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2353         RETURN(rc);
2354 }
2355
2356 static int osc_statfs_async(struct obd_export *exp,
2357                             struct obd_info *oinfo, __u64 max_age,
2358                             struct ptlrpc_request_set *rqset)
2359 {
2360         struct obd_device     *obd = class_exp2obd(exp);
2361         struct ptlrpc_request *req;
2362         struct osc_async_args *aa;
2363         int                    rc;
2364         ENTRY;
2365
2366         /* We could possibly pass max_age in the request (as an absolute
2367          * timestamp or a "seconds.usec ago") so the target can avoid doing
2368          * extra calls into the filesystem if that isn't necessary (e.g.
2369          * during mount that would help a bit).  Having relative timestamps
2370          * is not so great if request processing is slow, while absolute
2371          * timestamps are not ideal because they need time synchronization. */
2372         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2373         if (req == NULL)
2374                 RETURN(-ENOMEM);
2375
2376         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2377         if (rc) {
2378                 ptlrpc_request_free(req);
2379                 RETURN(rc);
2380         }
2381         ptlrpc_request_set_replen(req);
2382         req->rq_request_portal = OST_CREATE_PORTAL;
2383         ptlrpc_at_set_req_timeout(req);
2384
2385         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2386                 /* procfs requests not want stat in wait for avoid deadlock */
2387                 req->rq_no_resend = 1;
2388                 req->rq_no_delay = 1;
2389         }
2390
2391         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2392         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2393         aa = ptlrpc_req_async_args(req);
2394         aa->aa_oi = oinfo;
2395
2396         ptlrpc_set_add_req(rqset, req);
2397         RETURN(0);
2398 }
2399
2400 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2401                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2402 {
2403         struct obd_device     *obd = class_exp2obd(exp);
2404         struct obd_statfs     *msfs;
2405         struct ptlrpc_request *req;
2406         struct obd_import     *imp = NULL;
2407         int rc;
2408         ENTRY;
2409
2410         /*Since the request might also come from lprocfs, so we need
2411          *sync this with client_disconnect_export Bug15684*/
2412         down_read(&obd->u.cli.cl_sem);
2413         if (obd->u.cli.cl_import)
2414                 imp = class_import_get(obd->u.cli.cl_import);
2415         up_read(&obd->u.cli.cl_sem);
2416         if (!imp)
2417                 RETURN(-ENODEV);
2418
2419         /* We could possibly pass max_age in the request (as an absolute
2420          * timestamp or a "seconds.usec ago") so the target can avoid doing
2421          * extra calls into the filesystem if that isn't necessary (e.g.
2422          * during mount that would help a bit).  Having relative timestamps
2423          * is not so great if request processing is slow, while absolute
2424          * timestamps are not ideal because they need time synchronization. */
2425         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2426
2427         class_import_put(imp);
2428
2429         if (req == NULL)
2430                 RETURN(-ENOMEM);
2431
2432         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2433         if (rc) {
2434                 ptlrpc_request_free(req);
2435                 RETURN(rc);
2436         }
2437         ptlrpc_request_set_replen(req);
2438         req->rq_request_portal = OST_CREATE_PORTAL;
2439         ptlrpc_at_set_req_timeout(req);
2440
2441         if (flags & OBD_STATFS_NODELAY) {
2442                 /* procfs requests not want stat in wait for avoid deadlock */
2443                 req->rq_no_resend = 1;
2444                 req->rq_no_delay = 1;
2445         }
2446
2447         rc = ptlrpc_queue_wait(req);
2448         if (rc)
2449                 GOTO(out, rc);
2450
2451         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2452         if (msfs == NULL) {
2453                 GOTO(out, rc = -EPROTO);
2454         }
2455
2456         *osfs = *msfs;
2457
2458         EXIT;
2459  out:
2460         ptlrpc_req_finished(req);
2461         return rc;
2462 }
2463
2464 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2465                          void *karg, void *uarg)
2466 {
2467         struct obd_device *obd = exp->exp_obd;
2468         struct obd_ioctl_data *data = karg;
2469         int err = 0;
2470         ENTRY;
2471
2472         if (!try_module_get(THIS_MODULE)) {
2473                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2474                        module_name(THIS_MODULE));
2475                 return -EINVAL;
2476         }
2477         switch (cmd) {
2478         case OBD_IOC_CLIENT_RECOVER:
2479                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2480                                             data->ioc_inlbuf1, 0);
2481                 if (err > 0)
2482                         err = 0;
2483                 GOTO(out, err);
2484         case IOC_OSC_SET_ACTIVE:
2485                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2486                                                data->ioc_offset);
2487                 GOTO(out, err);
2488         case OBD_IOC_POLL_QUOTACHECK:
2489                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2490                 GOTO(out, err);
2491         case OBD_IOC_PING_TARGET:
2492                 err = ptlrpc_obd_ping(obd);
2493                 GOTO(out, err);
2494         default:
2495                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2496                        cmd, current_comm());
2497                 GOTO(out, err = -ENOTTY);
2498         }
2499 out:
2500         module_put(THIS_MODULE);
2501         return err;
2502 }
2503
2504 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2505                               obd_count keylen, void *key, obd_count vallen,
2506                               void *val, struct ptlrpc_request_set *set)
2507 {
2508         struct ptlrpc_request *req;
2509         struct obd_device     *obd = exp->exp_obd;
2510         struct obd_import     *imp = class_exp2cliimp(exp);
2511         char                  *tmp;
2512         int                    rc;
2513         ENTRY;
2514
2515         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2516
2517         if (KEY_IS(KEY_CHECKSUM)) {
2518                 if (vallen != sizeof(int))
2519                         RETURN(-EINVAL);
2520                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2521                 RETURN(0);
2522         }
2523
2524         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2525                 sptlrpc_conf_client_adapt(obd);
2526                 RETURN(0);
2527         }
2528
2529         if (KEY_IS(KEY_FLUSH_CTX)) {
2530                 sptlrpc_import_flush_my_ctx(imp);
2531                 RETURN(0);
2532         }
2533
2534         if (KEY_IS(KEY_CACHE_SET)) {
2535                 struct client_obd *cli = &obd->u.cli;
2536
2537                 LASSERT(cli->cl_cache == NULL); /* only once */
2538                 cli->cl_cache = (struct cl_client_cache *)val;
2539                 cl_cache_incref(cli->cl_cache);
2540                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2541
2542                 /* add this osc into entity list */
2543                 LASSERT(list_empty(&cli->cl_lru_osc));
2544                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2545                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2546                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2547
2548                 RETURN(0);
2549         }
2550
2551         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2552                 struct client_obd *cli = &obd->u.cli;
2553                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2554                 long target = *(long *)val;
2555
2556                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2557                 *(long *)val -= nr;
2558                 RETURN(0);
2559         }
2560
2561         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2562                 RETURN(-EINVAL);
2563
2564         /* We pass all other commands directly to OST. Since nobody calls osc
2565            methods directly and everybody is supposed to go through LOV, we
2566            assume lov checked invalid values for us.
2567            The only recognised values so far are evict_by_nid and mds_conn.
2568            Even if something bad goes through, we'd get a -EINVAL from OST
2569            anyway. */
2570
2571         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2572                                                 &RQF_OST_SET_GRANT_INFO :
2573                                                 &RQF_OBD_SET_INFO);
2574         if (req == NULL)
2575                 RETURN(-ENOMEM);
2576
2577         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2578                              RCL_CLIENT, keylen);
2579         if (!KEY_IS(KEY_GRANT_SHRINK))
2580                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2581                                      RCL_CLIENT, vallen);
2582         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2583         if (rc) {
2584                 ptlrpc_request_free(req);
2585                 RETURN(rc);
2586         }
2587
2588         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2589         memcpy(tmp, key, keylen);
2590         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2591                                                         &RMF_OST_BODY :
2592                                                         &RMF_SETINFO_VAL);
2593         memcpy(tmp, val, vallen);
2594
2595         if (KEY_IS(KEY_GRANT_SHRINK)) {
2596                 struct osc_grant_args *aa;
2597                 struct obdo *oa;
2598
2599                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2600                 aa = ptlrpc_req_async_args(req);
2601                 OBDO_ALLOC(oa);
2602                 if (!oa) {
2603                         ptlrpc_req_finished(req);
2604                         RETURN(-ENOMEM);
2605                 }
2606                 *oa = ((struct ost_body *)val)->oa;
2607                 aa->aa_oa = oa;
2608                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2609         }
2610
2611         ptlrpc_request_set_replen(req);
2612         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2613                 LASSERT(set != NULL);
2614                 ptlrpc_set_add_req(set, req);
2615                 ptlrpc_check_set(NULL, set);
2616         } else
2617                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2618
2619         RETURN(0);
2620 }
2621
2622 static int osc_reconnect(const struct lu_env *env,
2623                          struct obd_export *exp, struct obd_device *obd,
2624                          struct obd_uuid *cluuid,
2625                          struct obd_connect_data *data,
2626                          void *localdata)
2627 {
2628         struct client_obd *cli = &obd->u.cli;
2629
2630         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2631                 long lost_grant;
2632
2633                 spin_lock(&cli->cl_loi_list_lock);
2634                 data->ocd_grant = (cli->cl_avail_grant +
2635                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2636                                   2 * cli_brw_size(obd);
2637                 lost_grant = cli->cl_lost_grant;
2638                 cli->cl_lost_grant = 0;
2639                 spin_unlock(&cli->cl_loi_list_lock);
2640
2641                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2642                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2643                        data->ocd_version, data->ocd_grant, lost_grant);
2644         }
2645
2646         RETURN(0);
2647 }
2648
2649 static int osc_disconnect(struct obd_export *exp)
2650 {
2651         struct obd_device *obd = class_exp2obd(exp);
2652         int rc;
2653
2654         rc = client_disconnect_export(exp);
2655         /**
2656          * Initially we put del_shrink_grant before disconnect_export, but it
2657          * causes the following problem if setup (connect) and cleanup
2658          * (disconnect) are tangled together.
2659          *      connect p1                     disconnect p2
2660          *   ptlrpc_connect_import
2661          *     ...............               class_manual_cleanup
2662          *                                     osc_disconnect
2663          *                                     del_shrink_grant
2664          *   ptlrpc_connect_interrupt
2665          *     init_grant_shrink
2666          *   add this client to shrink list
2667          *                                      cleanup_osc
2668          * Bang! pinger trigger the shrink.
2669          * So the osc should be disconnected from the shrink list, after we
2670          * are sure the import has been destroyed. BUG18662
2671          */
2672         if (obd->u.cli.cl_import == NULL)
2673                 osc_del_shrink_grant(&obd->u.cli);
2674         return rc;
2675 }
2676
2677 static int osc_import_event(struct obd_device *obd,
2678                             struct obd_import *imp,
2679                             enum obd_import_event event)
2680 {
2681         struct client_obd *cli;
2682         int rc = 0;
2683
2684         ENTRY;
2685         LASSERT(imp->imp_obd == obd);
2686
2687         switch (event) {
2688         case IMP_EVENT_DISCON: {
2689                 cli = &obd->u.cli;
2690                 spin_lock(&cli->cl_loi_list_lock);
2691                 cli->cl_avail_grant = 0;
2692                 cli->cl_lost_grant = 0;
2693                 spin_unlock(&cli->cl_loi_list_lock);
2694                 break;
2695         }
2696         case IMP_EVENT_INACTIVE: {
2697                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2698                 break;
2699         }
2700         case IMP_EVENT_INVALIDATE: {
2701                 struct ldlm_namespace *ns = obd->obd_namespace;
2702                 struct lu_env         *env;
2703                 int                    refcheck;
2704
2705                 env = cl_env_get(&refcheck);
2706                 if (!IS_ERR(env)) {
2707                         /* Reset grants */
2708                         cli = &obd->u.cli;
2709                         /* all pages go to failing rpcs due to the invalid
2710                          * import */
2711                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2712
2713                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2714                         cl_env_put(env, &refcheck);
2715                 } else
2716                         rc = PTR_ERR(env);
2717                 break;
2718         }
2719         case IMP_EVENT_ACTIVE: {
2720                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2721                 break;
2722         }
2723         case IMP_EVENT_OCD: {
2724                 struct obd_connect_data *ocd = &imp->imp_connect_data;
2725
2726                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
2727                         osc_init_grant(&obd->u.cli, ocd);
2728
2729                 /* See bug 7198 */
2730                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
2731                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
2732
2733                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
2734                 break;
2735         }
2736         case IMP_EVENT_DEACTIVATE: {
2737                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
2738                 break;
2739         }
2740         case IMP_EVENT_ACTIVATE: {
2741                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
2742                 break;
2743         }
2744         default:
2745                 CERROR("Unknown import event %d\n", event);
2746                 LBUG();
2747         }
2748         RETURN(rc);
2749 }
2750
2751 /**
2752  * Determine whether the lock can be canceled before replaying the lock
2753  * during recovery, see bug16774 for detailed information.
2754  *
2755  * \retval zero the lock can't be canceled
2756  * \retval other ok to cancel
2757  */
2758 static int osc_cancel_weight(struct ldlm_lock *lock)
2759 {
2760         /*
2761          * Cancel all unused and granted extent lock.
2762          */
2763         if (lock->l_resource->lr_type == LDLM_EXTENT &&
2764             lock->l_granted_mode == lock->l_req_mode &&
2765             osc_ldlm_weigh_ast(lock) == 0)
2766                 RETURN(1);
2767
2768         RETURN(0);
2769 }
2770
2771 static int brw_queue_work(const struct lu_env *env, void *data)
2772 {
2773         struct client_obd *cli = data;
2774
2775         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2776
2777         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2778         RETURN(0);
2779 }
2780
2781 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2782 {
2783         struct client_obd *cli = &obd->u.cli;
2784         struct obd_type   *type;
2785         void              *handler;
2786         int                rc;
2787         ENTRY;
2788
2789         rc = ptlrpcd_addref();
2790         if (rc)
2791                 RETURN(rc);
2792
2793         rc = client_obd_setup(obd, lcfg);
2794         if (rc)
2795                 GOTO(out_ptlrpcd, rc);
2796
2797         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
2798         if (IS_ERR(handler))
2799                 GOTO(out_client_setup, rc = PTR_ERR(handler));
2800         cli->cl_writeback_work = handler;
2801
2802         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
2803         if (IS_ERR(handler))
2804                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
2805         cli->cl_lru_work = handler;
2806
2807         rc = osc_quota_setup(obd);
2808         if (rc)
2809                 GOTO(out_ptlrpcd_work, rc);
2810
2811         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
2812
2813 #ifdef CONFIG_PROC_FS
2814         obd->obd_vars = lprocfs_osc_obd_vars;
2815 #endif
2816         /* If this is true then both client (osc) and server (osp) are on the
2817          * same node. The osp layer if loaded first will register the osc proc
2818          * directory. In that case this obd_device will be attached its proc
2819          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
2820         type = class_search_type(LUSTRE_OSP_NAME);
2821         if (type && type->typ_procsym) {
2822                 obd->obd_proc_entry = lprocfs_register(obd->obd_name,
2823                                                        type->typ_procsym,
2824                                                        obd->obd_vars, obd);
2825                 if (IS_ERR(obd->obd_proc_entry)) {
2826                         rc = PTR_ERR(obd->obd_proc_entry);
2827                         CERROR("error %d setting up lprocfs for %s\n", rc,
2828                                obd->obd_name);
2829                         obd->obd_proc_entry = NULL;
2830                 }
2831         } else {
2832                 rc = lprocfs_obd_setup(obd);
2833         }
2834
2835         /* If the basic OSC proc tree construction succeeded then
2836          * lets do the rest. */
2837         if (rc == 0) {
2838                 lproc_osc_attach_seqstat(obd);
2839                 sptlrpc_lprocfs_cliobd_attach(obd);
2840                 ptlrpc_lprocfs_register_obd(obd);
2841         }
2842
2843         /* We need to allocate a few requests more, because
2844          * brw_interpret tries to create new requests before freeing
2845          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
2846          * reserved, but I'm afraid that might be too much wasted RAM
2847          * in fact, so 2 is just my guess and still should work. */
2848         cli->cl_import->imp_rq_pool =
2849                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
2850                                     OST_MAXREQSIZE,
2851                                     ptlrpc_add_rqs_to_pool);
2852
2853         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
2854         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
2855         RETURN(0);
2856
2857 out_ptlrpcd_work:
2858         if (cli->cl_writeback_work != NULL) {
2859                 ptlrpcd_destroy_work(cli->cl_writeback_work);
2860                 cli->cl_writeback_work = NULL;
2861         }
2862         if (cli->cl_lru_work != NULL) {
2863                 ptlrpcd_destroy_work(cli->cl_lru_work);
2864                 cli->cl_lru_work = NULL;
2865         }
2866 out_client_setup:
2867         client_obd_cleanup(obd);
2868 out_ptlrpcd:
2869         ptlrpcd_decref();
2870         RETURN(rc);
2871 }
2872
2873 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2874 {
2875         int rc = 0;
2876         ENTRY;
2877
2878         switch (stage) {
2879         case OBD_CLEANUP_EARLY: {
2880                 struct obd_import *imp;
2881                 imp = obd->u.cli.cl_import;
2882                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
2883                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
2884                 ptlrpc_deactivate_import(imp);
2885                 spin_lock(&imp->imp_lock);
2886                 imp->imp_pingable = 0;
2887                 spin_unlock(&imp->imp_lock);
2888                 break;
2889         }
2890         case OBD_CLEANUP_EXPORTS: {
2891                 struct client_obd *cli = &obd->u.cli;
2892                 /* LU-464
2893                  * for echo client, export may be on zombie list, wait for
2894                  * zombie thread to cull it, because cli.cl_import will be
2895                  * cleared in client_disconnect_export():
2896                  *   class_export_destroy() -> obd_cleanup() ->
2897                  *   echo_device_free() -> echo_client_cleanup() ->
2898                  *   obd_disconnect() -> osc_disconnect() ->
2899                  *   client_disconnect_export()
2900                  */
2901                 obd_zombie_barrier();
2902                 if (cli->cl_writeback_work) {
2903                         ptlrpcd_destroy_work(cli->cl_writeback_work);
2904                         cli->cl_writeback_work = NULL;
2905                 }
2906                 if (cli->cl_lru_work) {
2907                         ptlrpcd_destroy_work(cli->cl_lru_work);
2908                         cli->cl_lru_work = NULL;
2909                 }
2910                 obd_cleanup_client_import(obd);
2911                 ptlrpc_lprocfs_unregister_obd(obd);
2912                 lprocfs_obd_cleanup(obd);
2913                 break;
2914                 }
2915         }
2916         RETURN(rc);
2917 }
2918
2919 int osc_cleanup(struct obd_device *obd)
2920 {
2921         struct client_obd *cli = &obd->u.cli;
2922         int rc;
2923
2924         ENTRY;
2925
2926         /* lru cleanup */
2927         if (cli->cl_cache != NULL) {
2928                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
2929                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2930                 list_del_init(&cli->cl_lru_osc);
2931                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2932                 cli->cl_lru_left = NULL;
2933                 cl_cache_decref(cli->cl_cache);
2934                 cli->cl_cache = NULL;
2935         }
2936
2937         /* free memory of osc quota cache */
2938         osc_quota_cleanup(obd);
2939
2940         rc = client_obd_cleanup(obd);
2941
2942         ptlrpcd_decref();
2943         RETURN(rc);
2944 }
2945
2946 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
2947 {
2948         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
2949         return rc > 0 ? 0: rc;
2950 }
2951
2952 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
2953 {
2954         return osc_process_config_base(obd, buf);
2955 }
2956
2957 static struct obd_ops osc_obd_ops = {
2958         .o_owner                = THIS_MODULE,
2959         .o_setup                = osc_setup,
2960         .o_precleanup           = osc_precleanup,
2961         .o_cleanup              = osc_cleanup,
2962         .o_add_conn             = client_import_add_conn,
2963         .o_del_conn             = client_import_del_conn,
2964         .o_connect              = client_connect_import,
2965         .o_reconnect            = osc_reconnect,
2966         .o_disconnect           = osc_disconnect,
2967         .o_statfs               = osc_statfs,
2968         .o_statfs_async         = osc_statfs_async,
2969         .o_create               = osc_create,
2970         .o_destroy              = osc_destroy,
2971         .o_getattr              = osc_getattr,
2972         .o_getattr_async        = osc_getattr_async,
2973         .o_setattr              = osc_setattr,
2974         .o_setattr_async        = osc_setattr_async,
2975         .o_iocontrol            = osc_iocontrol,
2976         .o_set_info_async       = osc_set_info_async,
2977         .o_import_event         = osc_import_event,
2978         .o_process_config       = osc_process_config,
2979         .o_quotactl             = osc_quotactl,
2980         .o_quotacheck           = osc_quotacheck,
2981 };
2982
2983 static int __init osc_init(void)
2984 {
2985         bool enable_proc = true;
2986         struct obd_type *type;
2987         int rc;
2988         ENTRY;
2989
2990         /* print an address of _any_ initialized kernel symbol from this
2991          * module, to allow debugging with gdb that doesn't support data
2992          * symbols from modules.*/
2993         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
2994
2995         rc = lu_kmem_init(osc_caches);
2996         if (rc)
2997                 RETURN(rc);
2998
2999         type = class_search_type(LUSTRE_OSP_NAME);
3000         if (type != NULL && type->typ_procsym != NULL)
3001                 enable_proc = false;
3002
3003         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3004                                  LUSTRE_OSC_NAME, &osc_device_type);
3005         if (rc) {
3006                 lu_kmem_fini(osc_caches);
3007                 RETURN(rc);
3008         }
3009
3010         RETURN(rc);
3011 }
3012
3013 static void /*__exit*/ osc_exit(void)
3014 {
3015         class_unregister_type(LUSTRE_OSC_NAME);
3016         lu_kmem_fini(osc_caches);
3017 }
3018
3019 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3020 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3021 MODULE_LICENSE("GPL");
3022
3023 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);