Whamcloud - gitweb
LU-3259 clio: cl_lock simplification
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
46 #include <lustre_ha.h>
47 #include <lprocfs_status.h>
48 #include <lustre_ioctl.h>
49 #include <lustre_log.h>
50 #include <lustre_debug.h>
51 #include <lustre_param.h>
52 #include <lustre_fid.h>
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
55
56 struct osc_brw_async_args {
57         struct obdo              *aa_oa;
58         int                       aa_requested_nob;
59         int                       aa_nio_count;
60         obd_count                 aa_page_count;
61         int                       aa_resends;
62         struct brw_page **aa_ppga;
63         struct client_obd        *aa_cli;
64         struct list_head          aa_oaps;
65         struct list_head          aa_exts;
66         struct obd_capa  *aa_ocapa;
67         struct cl_req            *aa_clerq;
68 };
69
70 #define osc_grant_args osc_brw_async_args
71
72 struct osc_async_args {
73         struct obd_info *aa_oi;
74 };
75
76 struct osc_setattr_args {
77         struct obdo             *sa_oa;
78         obd_enqueue_update_f     sa_upcall;
79         void                    *sa_cookie;
80 };
81
82 struct osc_fsync_args {
83         struct obd_info *fa_oi;
84         obd_enqueue_update_f     fa_upcall;
85         void                    *fa_cookie;
86 };
87
88 struct osc_enqueue_args {
89         struct obd_export        *oa_exp;
90         ldlm_type_t               oa_type;
91         ldlm_mode_t               oa_mode;
92         __u64                    *oa_flags;
93         __u64                     oa_flags_internal;
94         osc_enqueue_upcall_f      oa_upcall;
95         void                     *oa_cookie;
96         struct ost_lvb           *oa_lvb;
97         struct lustre_handle      oa_lockh;
98         unsigned int              oa_agl:1;
99 };
100
101 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
102 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
103                          void *data, int rc);
104
105 /* Unpack OSC object metadata from disk storage (LE byte order). */
106 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
107                         struct lov_mds_md *lmm, int lmm_bytes)
108 {
109         int lsm_size;
110         struct obd_import *imp = class_exp2cliimp(exp);
111         ENTRY;
112
113         if (lmm != NULL) {
114                 if (lmm_bytes < sizeof(*lmm)) {
115                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
116                                exp->exp_obd->obd_name, lmm_bytes,
117                                (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
123                         CERROR("%s: zero lmm_object_id: rc = %d\n",
124                                exp->exp_obd->obd_name, -EINVAL);
125                         RETURN(-EINVAL);
126                 }
127         }
128
129         lsm_size = lov_stripe_md_size(1);
130         if (lsmp == NULL)
131                 RETURN(lsm_size);
132
133         if (*lsmp != NULL && lmm == NULL) {
134                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
135                 OBD_FREE(*lsmp, lsm_size);
136                 *lsmp = NULL;
137                 RETURN(0);
138         }
139
140         if (*lsmp == NULL) {
141                 OBD_ALLOC(*lsmp, lsm_size);
142                 if (unlikely(*lsmp == NULL))
143                         RETURN(-ENOMEM);
144                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
145                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
146                         OBD_FREE(*lsmp, lsm_size);
147                         RETURN(-ENOMEM);
148                 }
149                 loi_init((*lsmp)->lsm_oinfo[0]);
150         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
151                 RETURN(-EBADF);
152         }
153
154         if (lmm != NULL)
155                 /* XXX zero *lsmp? */
156                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
157
158         if (imp != NULL &&
159             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
160                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
161         else
162                 (*lsmp)->lsm_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
163
164         RETURN(lsm_size);
165 }
166
167 static inline void osc_pack_capa(struct ptlrpc_request *req,
168                                  struct ost_body *body, void *capa)
169 {
170         struct obd_capa *oc = (struct obd_capa *)capa;
171         struct lustre_capa *c;
172
173         if (!capa)
174                 return;
175
176         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
177         LASSERT(c);
178         capa_cpy(c, oc);
179         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
180         DEBUG_CAPA(D_SEC, c, "pack");
181 }
182
183 static inline void osc_pack_req_body(struct ptlrpc_request *req,
184                                      struct obd_info *oinfo)
185 {
186         struct ost_body *body;
187
188         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
189         LASSERT(body);
190
191         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
192                              oinfo->oi_oa);
193         osc_pack_capa(req, body, oinfo->oi_capa);
194 }
195
196 static inline void osc_set_capa_size(struct ptlrpc_request *req,
197                                      const struct req_msg_field *field,
198                                      struct obd_capa *oc)
199 {
200         if (oc == NULL)
201                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
202         else
203                 /* it is already calculated as sizeof struct obd_capa */
204                 ;
205 }
206
207 static int osc_getattr_interpret(const struct lu_env *env,
208                                  struct ptlrpc_request *req,
209                                  struct osc_async_args *aa, int rc)
210 {
211         struct ost_body *body;
212         ENTRY;
213
214         if (rc != 0)
215                 GOTO(out, rc);
216
217         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
218         if (body) {
219                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
220                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
221                                      aa->aa_oi->oi_oa, &body->oa);
222
223                 /* This should really be sent by the OST */
224                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
225                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
226         } else {
227                 CDEBUG(D_INFO, "can't unpack ost_body\n");
228                 rc = -EPROTO;
229                 aa->aa_oi->oi_oa->o_valid = 0;
230         }
231 out:
232         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
233         RETURN(rc);
234 }
235
236 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
237                              struct ptlrpc_request_set *set)
238 {
239         struct ptlrpc_request *req;
240         struct osc_async_args *aa;
241         int                    rc;
242         ENTRY;
243
244         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
245         if (req == NULL)
246                 RETURN(-ENOMEM);
247
248         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
249         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
250         if (rc) {
251                 ptlrpc_request_free(req);
252                 RETURN(rc);
253         }
254
255         osc_pack_req_body(req, oinfo);
256
257         ptlrpc_request_set_replen(req);
258         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
259
260         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
261         aa = ptlrpc_req_async_args(req);
262         aa->aa_oi = oinfo;
263
264         ptlrpc_set_add_req(set, req);
265         RETURN(0);
266 }
267
268 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
269                        struct obd_info *oinfo)
270 {
271         struct ptlrpc_request *req;
272         struct ost_body       *body;
273         int                    rc;
274         ENTRY;
275
276         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
277         if (req == NULL)
278                 RETURN(-ENOMEM);
279
280         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
281         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
282         if (rc) {
283                 ptlrpc_request_free(req);
284                 RETURN(rc);
285         }
286
287         osc_pack_req_body(req, oinfo);
288
289         ptlrpc_request_set_replen(req);
290
291         rc = ptlrpc_queue_wait(req);
292         if (rc)
293                 GOTO(out, rc);
294
295         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
296         if (body == NULL)
297                 GOTO(out, rc = -EPROTO);
298
299         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
300         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
301                              &body->oa);
302
303         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
304         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
305
306         EXIT;
307  out:
308         ptlrpc_req_finished(req);
309         return rc;
310 }
311
312 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
313                        struct obd_info *oinfo, struct obd_trans_info *oti)
314 {
315         struct ptlrpc_request *req;
316         struct ost_body       *body;
317         int                    rc;
318         ENTRY;
319
320         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
321
322         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
323         if (req == NULL)
324                 RETURN(-ENOMEM);
325
326         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
327         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
328         if (rc) {
329                 ptlrpc_request_free(req);
330                 RETURN(rc);
331         }
332
333         osc_pack_req_body(req, oinfo);
334
335         ptlrpc_request_set_replen(req);
336
337         rc = ptlrpc_queue_wait(req);
338         if (rc)
339                 GOTO(out, rc);
340
341         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
342         if (body == NULL)
343                 GOTO(out, rc = -EPROTO);
344
345         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
346                              &body->oa);
347
348         EXIT;
349 out:
350         ptlrpc_req_finished(req);
351         RETURN(rc);
352 }
353
354 static int osc_setattr_interpret(const struct lu_env *env,
355                                  struct ptlrpc_request *req,
356                                  struct osc_setattr_args *sa, int rc)
357 {
358         struct ost_body *body;
359         ENTRY;
360
361         if (rc != 0)
362                 GOTO(out, rc);
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL)
366                 GOTO(out, rc = -EPROTO);
367
368         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
369                              &body->oa);
370 out:
371         rc = sa->sa_upcall(sa->sa_cookie, rc);
372         RETURN(rc);
373 }
374
375 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
376                            struct obd_trans_info *oti,
377                            obd_enqueue_update_f upcall, void *cookie,
378                            struct ptlrpc_request_set *rqset)
379 {
380         struct ptlrpc_request   *req;
381         struct osc_setattr_args *sa;
382         int                      rc;
383         ENTRY;
384
385         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
386         if (req == NULL)
387                 RETURN(-ENOMEM);
388
389         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
390         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
391         if (rc) {
392                 ptlrpc_request_free(req);
393                 RETURN(rc);
394         }
395
396         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398
399         osc_pack_req_body(req, oinfo);
400
401         ptlrpc_request_set_replen(req);
402
403         /* do mds to ost setattr asynchronously */
404         if (!rqset) {
405                 /* Do not wait for response. */
406                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
407         } else {
408                 req->rq_interpret_reply =
409                         (ptlrpc_interpterer_t)osc_setattr_interpret;
410
411                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
412                 sa = ptlrpc_req_async_args(req);
413                 sa->sa_oa = oinfo->oi_oa;
414                 sa->sa_upcall = upcall;
415                 sa->sa_cookie = cookie;
416
417                 if (rqset == PTLRPCD_SET)
418                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
419                 else
420                         ptlrpc_set_add_req(rqset, req);
421         }
422
423         RETURN(0);
424 }
425
426 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
427                              struct obd_trans_info *oti,
428                              struct ptlrpc_request_set *rqset)
429 {
430         return osc_setattr_async_base(exp, oinfo, oti,
431                                       oinfo->oi_cb_up, oinfo, rqset);
432 }
433
434 int osc_real_create(struct obd_export *exp, struct obdo *oa,
435                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
436 {
437         struct ptlrpc_request *req;
438         struct ost_body       *body;
439         struct lov_stripe_md  *lsm;
440         int                    rc;
441         ENTRY;
442
443         LASSERT(oa);
444         LASSERT(ea);
445
446         lsm = *ea;
447         if (!lsm) {
448                 rc = obd_alloc_memmd(exp, &lsm);
449                 if (rc < 0)
450                         RETURN(rc);
451         }
452
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
454         if (req == NULL)
455                 GOTO(out, rc = -ENOMEM);
456
457         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
458         if (rc) {
459                 ptlrpc_request_free(req);
460                 GOTO(out, rc);
461         }
462
463         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
464         LASSERT(body);
465
466         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
467
468         ptlrpc_request_set_replen(req);
469
470         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
471             oa->o_flags == OBD_FL_DELORPHAN) {
472                 DEBUG_REQ(D_HA, req,
473                           "delorphan from OST integration");
474                 /* Don't resend the delorphan req */
475                 req->rq_no_resend = req->rq_no_delay = 1;
476         }
477
478         rc = ptlrpc_queue_wait(req);
479         if (rc)
480                 GOTO(out_req, rc);
481
482         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
483         if (body == NULL)
484                 GOTO(out_req, rc = -EPROTO);
485
486         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
487         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
488
489         oa->o_blksize = cli_brw_size(exp->exp_obd);
490         oa->o_valid |= OBD_MD_FLBLKSZ;
491
492         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
493          * have valid lsm_oinfo data structs, so don't go touching that.
494          * This needs to be fixed in a big way.
495          */
496         lsm->lsm_oi = oa->o_oi;
497         *ea = lsm;
498
499         if (oti != NULL) {
500                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
501                         if (oti->oti_logcookies == NULL)
502                                 oti->oti_logcookies = &oti->oti_onecookie;
503
504                         *oti->oti_logcookies = oa->o_lcookie;
505                 }
506         }
507
508         CDEBUG(D_HA, "transno: "LPD64"\n",
509                lustre_msg_get_transno(req->rq_repmsg));
510 out_req:
511         ptlrpc_req_finished(req);
512 out:
513         if (rc && !*ea)
514                 obd_free_memmd(exp, &lsm);
515         RETURN(rc);
516 }
517
518 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
519                    obd_enqueue_update_f upcall, void *cookie,
520                    struct ptlrpc_request_set *rqset)
521 {
522         struct ptlrpc_request   *req;
523         struct osc_setattr_args *sa;
524         struct ost_body         *body;
525         int                      rc;
526         ENTRY;
527
528         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
529         if (req == NULL)
530                 RETURN(-ENOMEM);
531
532         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
533         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
534         if (rc) {
535                 ptlrpc_request_free(req);
536                 RETURN(rc);
537         }
538         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539         ptlrpc_at_set_req_timeout(req);
540
541         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
542         LASSERT(body);
543         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
544                              oinfo->oi_oa);
545         osc_pack_capa(req, body, oinfo->oi_capa);
546
547         ptlrpc_request_set_replen(req);
548
549         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
550         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
551         sa = ptlrpc_req_async_args(req);
552         sa->sa_oa     = oinfo->oi_oa;
553         sa->sa_upcall = upcall;
554         sa->sa_cookie = cookie;
555         if (rqset == PTLRPCD_SET)
556                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
557         else
558                 ptlrpc_set_add_req(rqset, req);
559
560         RETURN(0);
561 }
562
563 static int osc_sync_interpret(const struct lu_env *env,
564                               struct ptlrpc_request *req,
565                               void *arg, int rc)
566 {
567         struct osc_fsync_args *fa = arg;
568         struct ost_body *body;
569         ENTRY;
570
571         if (rc)
572                 GOTO(out, rc);
573
574         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
575         if (body == NULL) {
576                 CERROR ("can't unpack ost_body\n");
577                 GOTO(out, rc = -EPROTO);
578         }
579
580         *fa->fa_oi->oi_oa = body->oa;
581 out:
582         rc = fa->fa_upcall(fa->fa_cookie, rc);
583         RETURN(rc);
584 }
585
586 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
587                   obd_enqueue_update_f upcall, void *cookie,
588                   struct ptlrpc_request_set *rqset)
589 {
590         struct ptlrpc_request *req;
591         struct ost_body       *body;
592         struct osc_fsync_args *fa;
593         int                    rc;
594         ENTRY;
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
597         if (req == NULL)
598                 RETURN(-ENOMEM);
599
600         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
601         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
602         if (rc) {
603                 ptlrpc_request_free(req);
604                 RETURN(rc);
605         }
606
607         /* overload the size and blocks fields in the oa with start/end */
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
611                              oinfo->oi_oa);
612         osc_pack_capa(req, body, oinfo->oi_capa);
613
614         ptlrpc_request_set_replen(req);
615         req->rq_interpret_reply = osc_sync_interpret;
616
617         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
618         fa = ptlrpc_req_async_args(req);
619         fa->fa_oi = oinfo;
620         fa->fa_upcall = upcall;
621         fa->fa_cookie = cookie;
622
623         if (rqset == PTLRPCD_SET)
624                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
625         else
626                 ptlrpc_set_add_req(rqset, req);
627
628         RETURN (0);
629 }
630
631 /* Find and cancel locally locks matched by @mode in the resource found by
632  * @objid. Found locks are added into @cancel list. Returns the amount of
633  * locks added to @cancels list. */
634 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
635                                    struct list_head *cancels,
636                                    ldlm_mode_t mode, __u64 lock_flags)
637 {
638         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
639         struct ldlm_res_id res_id;
640         struct ldlm_resource *res;
641         int count;
642         ENTRY;
643
644         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
645          * export) but disabled through procfs (flag in NS).
646          *
647          * This distinguishes from a case when ELC is not supported originally,
648          * when we still want to cancel locks in advance and just cancel them
649          * locally, without sending any RPC. */
650         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
651                 RETURN(0);
652
653         ostid_build_res_name(&oa->o_oi, &res_id);
654         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
655         if (IS_ERR(res))
656                 RETURN(0);
657
658         LDLM_RESOURCE_ADDREF(res);
659         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
660                                            lock_flags, 0, NULL);
661         LDLM_RESOURCE_DELREF(res);
662         ldlm_resource_putref(res);
663         RETURN(count);
664 }
665
666 static int osc_destroy_interpret(const struct lu_env *env,
667                                  struct ptlrpc_request *req, void *data,
668                                  int rc)
669 {
670         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
671
672         atomic_dec(&cli->cl_destroy_in_flight);
673         wake_up(&cli->cl_destroy_waitq);
674         return 0;
675 }
676
677 static int osc_can_send_destroy(struct client_obd *cli)
678 {
679         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
680             cli->cl_max_rpcs_in_flight) {
681                 /* The destroy request can be sent */
682                 return 1;
683         }
684         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
685             cli->cl_max_rpcs_in_flight) {
686                 /*
687                  * The counter has been modified between the two atomic
688                  * operations.
689                  */
690                 wake_up(&cli->cl_destroy_waitq);
691         }
692         return 0;
693 }
694
695 int osc_create(const struct lu_env *env, struct obd_export *exp,
696                struct obdo *oa, struct lov_stripe_md **ea,
697                struct obd_trans_info *oti)
698 {
699         int rc = 0;
700         ENTRY;
701
702         LASSERT(oa);
703         LASSERT(ea);
704         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
705
706         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
707             oa->o_flags == OBD_FL_RECREATE_OBJS) {
708                 RETURN(osc_real_create(exp, oa, ea, oti));
709         }
710
711         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
712                 RETURN(osc_real_create(exp, oa, ea, oti));
713
714         /* we should not get here anymore */
715         LBUG();
716
717         RETURN(rc);
718 }
719
720 /* Destroy requests can be async always on the client, and we don't even really
721  * care about the return code since the client cannot do anything at all about
722  * a destroy failure.
723  * When the MDS is unlinking a filename, it saves the file objects into a
724  * recovery llog, and these object records are cancelled when the OST reports
725  * they were destroyed and sync'd to disk (i.e. transaction committed).
726  * If the client dies, or the OST is down when the object should be destroyed,
727  * the records are not cancelled, and when the OST reconnects to the MDS next,
728  * it will retrieve the llog unlink logs and then sends the log cancellation
729  * cookies to the MDS after committing destroy transactions. */
730 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
731                        struct obdo *oa, struct lov_stripe_md *ea,
732                        struct obd_trans_info *oti, struct obd_export *md_export,
733                        void *capa)
734 {
735         struct client_obd     *cli = &exp->exp_obd->u.cli;
736         struct ptlrpc_request *req;
737         struct ost_body       *body;
738         struct list_head       cancels = LIST_HEAD_INIT(cancels);
739         int rc, count;
740         ENTRY;
741
742         if (!oa) {
743                 CDEBUG(D_INFO, "oa NULL\n");
744                 RETURN(-EINVAL);
745         }
746
747         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
748                                         LDLM_FL_DISCARD_DATA);
749
750         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
751         if (req == NULL) {
752                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
753                 RETURN(-ENOMEM);
754         }
755
756         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
757         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
758                                0, &cancels, count);
759         if (rc) {
760                 ptlrpc_request_free(req);
761                 RETURN(rc);
762         }
763
764         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
765         ptlrpc_at_set_req_timeout(req);
766
767         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
768                 oa->o_lcookie = *oti->oti_logcookies;
769         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
770         LASSERT(body);
771         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
772
773         osc_pack_capa(req, body, (struct obd_capa *)capa);
774         ptlrpc_request_set_replen(req);
775
776         /* If osc_destory is for destroying the unlink orphan,
777          * sent from MDT to OST, which should not be blocked here,
778          * because the process might be triggered by ptlrpcd, and
779          * it is not good to block ptlrpcd thread (b=16006)*/
780         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
781                 req->rq_interpret_reply = osc_destroy_interpret;
782                 if (!osc_can_send_destroy(cli)) {
783                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
784                                                           NULL);
785
786                         /*
787                          * Wait until the number of on-going destroy RPCs drops
788                          * under max_rpc_in_flight
789                          */
790                         l_wait_event_exclusive(cli->cl_destroy_waitq,
791                                                osc_can_send_destroy(cli), &lwi);
792                 }
793         }
794
795         /* Do not wait for response */
796         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
797         RETURN(0);
798 }
799
800 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
801                                 long writing_bytes)
802 {
803         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
804
805         LASSERT(!(oa->o_valid & bits));
806
807         oa->o_valid |= bits;
808         client_obd_list_lock(&cli->cl_loi_list_lock);
809         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
810         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
811                      cli->cl_dirty_max_pages)) {
812                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
813                        cli->cl_dirty_pages, cli->cl_dirty_transit,
814                        cli->cl_dirty_max_pages);
815                 oa->o_undirty = 0;
816         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
817                             atomic_long_read(&obd_dirty_transit_pages) >
818                             (obd_max_dirty_pages + 1))) {
819                 /* The atomic_read() allowing the atomic_inc() are
820                  * not covered by a lock thus they may safely race and trip
821                  * this CERROR() unless we add in a small fudge factor (+1). */
822                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
823                        cli->cl_import->imp_obd->obd_name,
824                        atomic_long_read(&obd_dirty_pages),
825                        atomic_long_read(&obd_dirty_transit_pages),
826                        obd_max_dirty_pages);
827                 oa->o_undirty = 0;
828         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
829                             0x7fffffff)) {
830                 CERROR("dirty %lu - dirty_max %lu too big???\n",
831                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
832                 oa->o_undirty = 0;
833         } else {
834                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
835                                       PAGE_CACHE_SHIFT) *
836                                      (cli->cl_max_rpcs_in_flight + 1);
837                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
838                                     max_in_flight);
839         }
840         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
841         oa->o_dropped = cli->cl_lost_grant;
842         cli->cl_lost_grant = 0;
843         client_obd_list_unlock(&cli->cl_loi_list_lock);
844         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
845                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
846
847 }
848
849 void osc_update_next_shrink(struct client_obd *cli)
850 {
851         cli->cl_next_shrink_grant =
852                 cfs_time_shift(cli->cl_grant_shrink_interval);
853         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
854                cli->cl_next_shrink_grant);
855 }
856
857 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
858 {
859         client_obd_list_lock(&cli->cl_loi_list_lock);
860         cli->cl_avail_grant += grant;
861         client_obd_list_unlock(&cli->cl_loi_list_lock);
862 }
863
864 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
865 {
866         if (body->oa.o_valid & OBD_MD_FLGRANT) {
867                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
868                 __osc_update_grant(cli, body->oa.o_grant);
869         }
870 }
871
872 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
873                               obd_count keylen, void *key, obd_count vallen,
874                               void *val, struct ptlrpc_request_set *set);
875
876 static int osc_shrink_grant_interpret(const struct lu_env *env,
877                                       struct ptlrpc_request *req,
878                                       void *aa, int rc)
879 {
880         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
881         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
882         struct ost_body *body;
883
884         if (rc != 0) {
885                 __osc_update_grant(cli, oa->o_grant);
886                 GOTO(out, rc);
887         }
888
889         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
890         LASSERT(body);
891         osc_update_grant(cli, body);
892 out:
893         OBDO_FREE(oa);
894         return rc;
895 }
896
897 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
898 {
899         client_obd_list_lock(&cli->cl_loi_list_lock);
900         oa->o_grant = cli->cl_avail_grant / 4;
901         cli->cl_avail_grant -= oa->o_grant;
902         client_obd_list_unlock(&cli->cl_loi_list_lock);
903         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
904                 oa->o_valid |= OBD_MD_FLFLAGS;
905                 oa->o_flags = 0;
906         }
907         oa->o_flags |= OBD_FL_SHRINK_GRANT;
908         osc_update_next_shrink(cli);
909 }
910
911 /* Shrink the current grant, either from some large amount to enough for a
912  * full set of in-flight RPCs, or if we have already shrunk to that limit
913  * then to enough for a single RPC.  This avoids keeping more grant than
914  * needed, and avoids shrinking the grant piecemeal. */
915 static int osc_shrink_grant(struct client_obd *cli)
916 {
917         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
918                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
919
920         client_obd_list_lock(&cli->cl_loi_list_lock);
921         if (cli->cl_avail_grant <= target_bytes)
922                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
923         client_obd_list_unlock(&cli->cl_loi_list_lock);
924
925         return osc_shrink_grant_to_target(cli, target_bytes);
926 }
927
928 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
929 {
930         int                     rc = 0;
931         struct ost_body        *body;
932         ENTRY;
933
934         client_obd_list_lock(&cli->cl_loi_list_lock);
935         /* Don't shrink if we are already above or below the desired limit
936          * We don't want to shrink below a single RPC, as that will negatively
937          * impact block allocation and long-term performance. */
938         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
939                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
940
941         if (target_bytes >= cli->cl_avail_grant) {
942                 client_obd_list_unlock(&cli->cl_loi_list_lock);
943                 RETURN(0);
944         }
945         client_obd_list_unlock(&cli->cl_loi_list_lock);
946
947         OBD_ALLOC_PTR(body);
948         if (!body)
949                 RETURN(-ENOMEM);
950
951         osc_announce_cached(cli, &body->oa, 0);
952
953         client_obd_list_lock(&cli->cl_loi_list_lock);
954         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
955         cli->cl_avail_grant = target_bytes;
956         client_obd_list_unlock(&cli->cl_loi_list_lock);
957         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
958                 body->oa.o_valid |= OBD_MD_FLFLAGS;
959                 body->oa.o_flags = 0;
960         }
961         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
962         osc_update_next_shrink(cli);
963
964         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
965                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
966                                 sizeof(*body), body, NULL);
967         if (rc != 0)
968                 __osc_update_grant(cli, body->oa.o_grant);
969         OBD_FREE_PTR(body);
970         RETURN(rc);
971 }
972
973 static int osc_should_shrink_grant(struct client_obd *client)
974 {
975         cfs_time_t time = cfs_time_current();
976         cfs_time_t next_shrink = client->cl_next_shrink_grant;
977
978         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
979              OBD_CONNECT_GRANT_SHRINK) == 0)
980                 return 0;
981
982         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
983                 /* Get the current RPC size directly, instead of going via:
984                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
985                  * Keep comment here so that it can be found by searching. */
986                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
987
988                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
989                     client->cl_avail_grant > brw_size)
990                         return 1;
991                 else
992                         osc_update_next_shrink(client);
993         }
994         return 0;
995 }
996
997 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
998 {
999         struct client_obd *client;
1000
1001         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1002                 if (osc_should_shrink_grant(client))
1003                         osc_shrink_grant(client);
1004         }
1005         return 0;
1006 }
1007
1008 static int osc_add_shrink_grant(struct client_obd *client)
1009 {
1010         int rc;
1011
1012         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1013                                        TIMEOUT_GRANT,
1014                                        osc_grant_shrink_grant_cb, NULL,
1015                                        &client->cl_grant_shrink_list);
1016         if (rc) {
1017                 CERROR("add grant client %s error %d\n",
1018                         client->cl_import->imp_obd->obd_name, rc);
1019                 return rc;
1020         }
1021         CDEBUG(D_CACHE, "add grant client %s \n",
1022                client->cl_import->imp_obd->obd_name);
1023         osc_update_next_shrink(client);
1024         return 0;
1025 }
1026
1027 static int osc_del_shrink_grant(struct client_obd *client)
1028 {
1029         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1030                                          TIMEOUT_GRANT);
1031 }
1032
1033 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1034 {
1035         /*
1036          * ocd_grant is the total grant amount we're expect to hold: if we've
1037          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1038          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1039          * dirty.
1040          *
1041          * race is tolerable here: if we're evicted, but imp_state already
1042          * left EVICTED state, then cl_dirty_pages must be 0 already.
1043          */
1044         client_obd_list_lock(&cli->cl_loi_list_lock);
1045         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1046                 cli->cl_avail_grant = ocd->ocd_grant;
1047         else
1048                 cli->cl_avail_grant = ocd->ocd_grant -
1049                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1050
1051         if (cli->cl_avail_grant < 0) {
1052                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1053                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1054                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1055                 /* workaround for servers which do not have the patch from
1056                  * LU-2679 */
1057                 cli->cl_avail_grant = ocd->ocd_grant;
1058         }
1059
1060         /* determine the appropriate chunk size used by osc_extent. */
1061         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1062         client_obd_list_unlock(&cli->cl_loi_list_lock);
1063
1064         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1065                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1066                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1067
1068         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1069             list_empty(&cli->cl_grant_shrink_list))
1070                 osc_add_shrink_grant(cli);
1071 }
1072
1073 /* We assume that the reason this OSC got a short read is because it read
1074  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1075  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1076  * this stripe never got written at or beyond this stripe offset yet. */
1077 static void handle_short_read(int nob_read, obd_count page_count,
1078                               struct brw_page **pga)
1079 {
1080         char *ptr;
1081         int i = 0;
1082
1083         /* skip bytes read OK */
1084         while (nob_read > 0) {
1085                 LASSERT (page_count > 0);
1086
1087                 if (pga[i]->count > nob_read) {
1088                         /* EOF inside this page */
1089                         ptr = kmap(pga[i]->pg) +
1090                                 (pga[i]->off & ~CFS_PAGE_MASK);
1091                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1092                         kunmap(pga[i]->pg);
1093                         page_count--;
1094                         i++;
1095                         break;
1096                 }
1097
1098                 nob_read -= pga[i]->count;
1099                 page_count--;
1100                 i++;
1101         }
1102
1103         /* zero remaining pages */
1104         while (page_count-- > 0) {
1105                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1106                 memset(ptr, 0, pga[i]->count);
1107                 kunmap(pga[i]->pg);
1108                 i++;
1109         }
1110 }
1111
1112 static int check_write_rcs(struct ptlrpc_request *req,
1113                            int requested_nob, int niocount,
1114                            obd_count page_count, struct brw_page **pga)
1115 {
1116         int     i;
1117         __u32   *remote_rcs;
1118
1119         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1120                                                   sizeof(*remote_rcs) *
1121                                                   niocount);
1122         if (remote_rcs == NULL) {
1123                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1124                 return(-EPROTO);
1125         }
1126
1127         /* return error if any niobuf was in error */
1128         for (i = 0; i < niocount; i++) {
1129                 if ((int)remote_rcs[i] < 0)
1130                         return(remote_rcs[i]);
1131
1132                 if (remote_rcs[i] != 0) {
1133                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1134                                 i, remote_rcs[i], req);
1135                         return(-EPROTO);
1136                 }
1137         }
1138
1139         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1140                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1141                        req->rq_bulk->bd_nob_transferred, requested_nob);
1142                 return(-EPROTO);
1143         }
1144
1145         return (0);
1146 }
1147
1148 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1149 {
1150         if (p1->flag != p2->flag) {
1151                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1152                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1153                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1154
1155                 /* warn if we try to combine flags that we don't know to be
1156                  * safe to combine */
1157                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1158                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1159                               "report this at https://jira.hpdd.intel.com/\n",
1160                               p1->flag, p2->flag);
1161                 }
1162                 return 0;
1163         }
1164
1165         return (p1->off + p1->count == p2->off);
1166 }
1167
1168 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1169                                    struct brw_page **pga, int opc,
1170                                    cksum_type_t cksum_type)
1171 {
1172         __u32                           cksum;
1173         int                             i = 0;
1174         struct cfs_crypto_hash_desc     *hdesc;
1175         unsigned int                    bufsize;
1176         int                             err;
1177         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1178
1179         LASSERT(pg_count > 0);
1180
1181         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1182         if (IS_ERR(hdesc)) {
1183                 CERROR("Unable to initialize checksum hash %s\n",
1184                        cfs_crypto_hash_name(cfs_alg));
1185                 return PTR_ERR(hdesc);
1186         }
1187
1188         while (nob > 0 && pg_count > 0) {
1189                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1190
1191                 /* corrupt the data before we compute the checksum, to
1192                  * simulate an OST->client data error */
1193                 if (i == 0 && opc == OST_READ &&
1194                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1195                         unsigned char *ptr = kmap(pga[i]->pg);
1196                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1197
1198                         memcpy(ptr + off, "bad1", min(4, nob));
1199                         kunmap(pga[i]->pg);
1200                 }
1201                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1202                                             pga[i]->off & ~CFS_PAGE_MASK,
1203                                             count);
1204                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1205                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1206
1207                 nob -= pga[i]->count;
1208                 pg_count--;
1209                 i++;
1210         }
1211
1212         bufsize = sizeof(cksum);
1213         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1214
1215         /* For sending we only compute the wrong checksum instead
1216          * of corrupting the data so it is still correct on a redo */
1217         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1218                 cksum++;
1219
1220         return cksum;
1221 }
1222
1223 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1224                                 struct lov_stripe_md *lsm, obd_count page_count,
1225                                 struct brw_page **pga,
1226                                 struct ptlrpc_request **reqp,
1227                                 struct obd_capa *ocapa, int reserve,
1228                                 int resend)
1229 {
1230         struct ptlrpc_request   *req;
1231         struct ptlrpc_bulk_desc *desc;
1232         struct ost_body         *body;
1233         struct obd_ioobj        *ioobj;
1234         struct niobuf_remote    *niobuf;
1235         int niocount, i, requested_nob, opc, rc;
1236         struct osc_brw_async_args *aa;
1237         struct req_capsule      *pill;
1238         struct brw_page *pg_prev;
1239
1240         ENTRY;
1241         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1242                 RETURN(-ENOMEM); /* Recoverable */
1243         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1244                 RETURN(-EINVAL); /* Fatal */
1245
1246         if ((cmd & OBD_BRW_WRITE) != 0) {
1247                 opc = OST_WRITE;
1248                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1249                                                 cli->cl_import->imp_rq_pool,
1250                                                 &RQF_OST_BRW_WRITE);
1251         } else {
1252                 opc = OST_READ;
1253                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1254         }
1255         if (req == NULL)
1256                 RETURN(-ENOMEM);
1257
1258         for (niocount = i = 1; i < page_count; i++) {
1259                 if (!can_merge_pages(pga[i - 1], pga[i]))
1260                         niocount++;
1261         }
1262
1263         pill = &req->rq_pill;
1264         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1265                              sizeof(*ioobj));
1266         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1267                              niocount * sizeof(*niobuf));
1268         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1269
1270         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1271         if (rc) {
1272                 ptlrpc_request_free(req);
1273                 RETURN(rc);
1274         }
1275         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1276         ptlrpc_at_set_req_timeout(req);
1277         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1278          * retry logic */
1279         req->rq_no_retry_einprogress = 1;
1280
1281         desc = ptlrpc_prep_bulk_imp(req, page_count,
1282                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1283                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1284                 OST_BULK_PORTAL);
1285
1286         if (desc == NULL)
1287                 GOTO(out, rc = -ENOMEM);
1288         /* NB request now owns desc and will free it when it gets freed */
1289
1290         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1291         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1292         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1293         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1294
1295         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1296
1297         obdo_to_ioobj(oa, ioobj);
1298         ioobj->ioo_bufcnt = niocount;
1299         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1300          * that might be send for this request.  The actual number is decided
1301          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1302          * "max - 1" for old client compatibility sending "0", and also so the
1303          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1304         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1305         osc_pack_capa(req, body, ocapa);
1306         LASSERT(page_count > 0);
1307         pg_prev = pga[0];
1308         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1309                 struct brw_page *pg = pga[i];
1310                 int poff = pg->off & ~CFS_PAGE_MASK;
1311
1312                 LASSERT(pg->count > 0);
1313                 /* make sure there is no gap in the middle of page array */
1314                 LASSERTF(page_count == 1 ||
1315                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1316                           ergo(i > 0 && i < page_count - 1,
1317                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1318                           ergo(i == page_count - 1, poff == 0)),
1319                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1320                          i, page_count, pg, pg->off, pg->count);
1321                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1322                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1323                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1324                          i, page_count,
1325                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1326                          pg_prev->pg, page_private(pg_prev->pg),
1327                          pg_prev->pg->index, pg_prev->off);
1328                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1329                         (pg->flag & OBD_BRW_SRVLOCK));
1330
1331                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1332                 requested_nob += pg->count;
1333
1334                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1335                         niobuf--;
1336                         niobuf->rnb_len += pg->count;
1337                 } else {
1338                         niobuf->rnb_offset = pg->off;
1339                         niobuf->rnb_len    = pg->count;
1340                         niobuf->rnb_flags  = pg->flag;
1341                 }
1342                 pg_prev = pg;
1343         }
1344
1345         LASSERTF((void *)(niobuf - niocount) ==
1346                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1347                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1348                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1349
1350         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1351         if (resend) {
1352                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1353                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1354                         body->oa.o_flags = 0;
1355                 }
1356                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1357         }
1358
1359         if (osc_should_shrink_grant(cli))
1360                 osc_shrink_grant_local(cli, &body->oa);
1361
1362         /* size[REQ_REC_OFF] still sizeof (*body) */
1363         if (opc == OST_WRITE) {
1364                 if (cli->cl_checksum &&
1365                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1366                         /* store cl_cksum_type in a local variable since
1367                          * it can be changed via lprocfs */
1368                         cksum_type_t cksum_type = cli->cl_cksum_type;
1369
1370                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1371                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1372                                 body->oa.o_flags = 0;
1373                         }
1374                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1375                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1376                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1377                                                              page_count, pga,
1378                                                              OST_WRITE,
1379                                                              cksum_type);
1380                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1381                                body->oa.o_cksum);
1382                         /* save this in 'oa', too, for later checking */
1383                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1384                         oa->o_flags |= cksum_type_pack(cksum_type);
1385                 } else {
1386                         /* clear out the checksum flag, in case this is a
1387                          * resend but cl_checksum is no longer set. b=11238 */
1388                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1389                 }
1390                 oa->o_cksum = body->oa.o_cksum;
1391                 /* 1 RC per niobuf */
1392                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1393                                      sizeof(__u32) * niocount);
1394         } else {
1395                 if (cli->cl_checksum &&
1396                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1397                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1398                                 body->oa.o_flags = 0;
1399                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1400                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1401                 }
1402         }
1403         ptlrpc_request_set_replen(req);
1404
1405         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1406         aa = ptlrpc_req_async_args(req);
1407         aa->aa_oa = oa;
1408         aa->aa_requested_nob = requested_nob;
1409         aa->aa_nio_count = niocount;
1410         aa->aa_page_count = page_count;
1411         aa->aa_resends = 0;
1412         aa->aa_ppga = pga;
1413         aa->aa_cli = cli;
1414         INIT_LIST_HEAD(&aa->aa_oaps);
1415         if (ocapa && reserve)
1416                 aa->aa_ocapa = capa_get(ocapa);
1417
1418         *reqp = req;
1419         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1420         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1421                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1422                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1423         RETURN(0);
1424
1425  out:
1426         ptlrpc_req_finished(req);
1427         RETURN(rc);
1428 }
1429
1430 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1431                                 __u32 client_cksum, __u32 server_cksum, int nob,
1432                                 obd_count page_count, struct brw_page **pga,
1433                                 cksum_type_t client_cksum_type)
1434 {
1435         __u32 new_cksum;
1436         char *msg;
1437         cksum_type_t cksum_type;
1438
1439         if (server_cksum == client_cksum) {
1440                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1441                 return 0;
1442         }
1443
1444         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1445                                        oa->o_flags : 0);
1446         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1447                                       cksum_type);
1448
1449         if (cksum_type != client_cksum_type)
1450                 msg = "the server did not use the checksum type specified in "
1451                       "the original request - likely a protocol problem";
1452         else if (new_cksum == server_cksum)
1453                 msg = "changed on the client after we checksummed it - "
1454                       "likely false positive due to mmap IO (bug 11742)";
1455         else if (new_cksum == client_cksum)
1456                 msg = "changed in transit before arrival at OST";
1457         else
1458                 msg = "changed in transit AND doesn't match the original - "
1459                       "likely false positive due to mmap IO (bug 11742)";
1460
1461         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1462                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1463                            msg, libcfs_nid2str(peer->nid),
1464                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1465                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1466                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1467                            POSTID(&oa->o_oi), pga[0]->off,
1468                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1469         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1470                "client csum now %x\n", client_cksum, client_cksum_type,
1471                server_cksum, cksum_type, new_cksum);
1472         return 1;
1473 }
1474
1475 /* Note rc enters this function as number of bytes transferred */
1476 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1477 {
1478         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1479         const lnet_process_id_t *peer =
1480                         &req->rq_import->imp_connection->c_peer;
1481         struct client_obd *cli = aa->aa_cli;
1482         struct ost_body *body;
1483         __u32 client_cksum = 0;
1484         ENTRY;
1485
1486         if (rc < 0 && rc != -EDQUOT) {
1487                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1488                 RETURN(rc);
1489         }
1490
1491         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1492         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1493         if (body == NULL) {
1494                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1495                 RETURN(-EPROTO);
1496         }
1497
1498         /* set/clear over quota flag for a uid/gid */
1499         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1500             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1501                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1502
1503                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1504                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1505                        body->oa.o_flags);
1506                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1507         }
1508
1509         osc_update_grant(cli, body);
1510
1511         if (rc < 0)
1512                 RETURN(rc);
1513
1514         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1515                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1516
1517         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1518                 if (rc > 0) {
1519                         CERROR("Unexpected +ve rc %d\n", rc);
1520                         RETURN(-EPROTO);
1521                 }
1522                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1523
1524                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1525                         RETURN(-EAGAIN);
1526
1527                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1528                     check_write_checksum(&body->oa, peer, client_cksum,
1529                                          body->oa.o_cksum, aa->aa_requested_nob,
1530                                          aa->aa_page_count, aa->aa_ppga,
1531                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1532                         RETURN(-EAGAIN);
1533
1534                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1535                                      aa->aa_page_count, aa->aa_ppga);
1536                 GOTO(out, rc);
1537         }
1538
1539         /* The rest of this function executes only for OST_READs */
1540
1541         /* if unwrap_bulk failed, return -EAGAIN to retry */
1542         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1543         if (rc < 0)
1544                 GOTO(out, rc = -EAGAIN);
1545
1546         if (rc > aa->aa_requested_nob) {
1547                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1548                        aa->aa_requested_nob);
1549                 RETURN(-EPROTO);
1550         }
1551
1552         if (rc != req->rq_bulk->bd_nob_transferred) {
1553                 CERROR ("Unexpected rc %d (%d transferred)\n",
1554                         rc, req->rq_bulk->bd_nob_transferred);
1555                 return (-EPROTO);
1556         }
1557
1558         if (rc < aa->aa_requested_nob)
1559                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1560
1561         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1562                 static int cksum_counter;
1563                 __u32      server_cksum = body->oa.o_cksum;
1564                 char      *via;
1565                 char      *router;
1566                 cksum_type_t cksum_type;
1567
1568                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1569                                                body->oa.o_flags : 0);
1570                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1571                                                  aa->aa_ppga, OST_READ,
1572                                                  cksum_type);
1573
1574                 if (peer->nid == req->rq_bulk->bd_sender) {
1575                         via = router = "";
1576                 } else {
1577                         via = " via ";
1578                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1579                 }
1580
1581                 if (server_cksum != client_cksum) {
1582                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1583                                            "%s%s%s inode "DFID" object "DOSTID
1584                                            " extent ["LPU64"-"LPU64"]\n",
1585                                            req->rq_import->imp_obd->obd_name,
1586                                            libcfs_nid2str(peer->nid),
1587                                            via, router,
1588                                            body->oa.o_valid & OBD_MD_FLFID ?
1589                                                 body->oa.o_parent_seq : (__u64)0,
1590                                            body->oa.o_valid & OBD_MD_FLFID ?
1591                                                 body->oa.o_parent_oid : 0,
1592                                            body->oa.o_valid & OBD_MD_FLFID ?
1593                                                 body->oa.o_parent_ver : 0,
1594                                            POSTID(&body->oa.o_oi),
1595                                            aa->aa_ppga[0]->off,
1596                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1597                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1598                                                                         1);
1599                         CERROR("client %x, server %x, cksum_type %x\n",
1600                                client_cksum, server_cksum, cksum_type);
1601                         cksum_counter = 0;
1602                         aa->aa_oa->o_cksum = client_cksum;
1603                         rc = -EAGAIN;
1604                 } else {
1605                         cksum_counter++;
1606                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1607                         rc = 0;
1608                 }
1609         } else if (unlikely(client_cksum)) {
1610                 static int cksum_missed;
1611
1612                 cksum_missed++;
1613                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1614                         CERROR("Checksum %u requested from %s but not sent\n",
1615                                cksum_missed, libcfs_nid2str(peer->nid));
1616         } else {
1617                 rc = 0;
1618         }
1619 out:
1620         if (rc >= 0)
1621                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1622                                      aa->aa_oa, &body->oa);
1623
1624         RETURN(rc);
1625 }
1626
1627 static int osc_brw_redo_request(struct ptlrpc_request *request,
1628                                 struct osc_brw_async_args *aa, int rc)
1629 {
1630         struct ptlrpc_request *new_req;
1631         struct osc_brw_async_args *new_aa;
1632         struct osc_async_page *oap;
1633         ENTRY;
1634
1635         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1636                   "redo for recoverable error %d", rc);
1637
1638         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1639                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1640                                   aa->aa_cli, aa->aa_oa,
1641                                   NULL /* lsm unused by osc currently */,
1642                                   aa->aa_page_count, aa->aa_ppga,
1643                                   &new_req, aa->aa_ocapa, 0, 1);
1644         if (rc)
1645                 RETURN(rc);
1646
1647         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1648                 if (oap->oap_request != NULL) {
1649                         LASSERTF(request == oap->oap_request,
1650                                  "request %p != oap_request %p\n",
1651                                  request, oap->oap_request);
1652                         if (oap->oap_interrupted) {
1653                                 ptlrpc_req_finished(new_req);
1654                                 RETURN(-EINTR);
1655                         }
1656                 }
1657         }
1658         /* New request takes over pga and oaps from old request.
1659          * Note that copying a list_head doesn't work, need to move it... */
1660         aa->aa_resends++;
1661         new_req->rq_interpret_reply = request->rq_interpret_reply;
1662         new_req->rq_async_args = request->rq_async_args;
1663         new_req->rq_commit_cb = request->rq_commit_cb;
1664         /* cap resend delay to the current request timeout, this is similar to
1665          * what ptlrpc does (see after_reply()) */
1666         if (aa->aa_resends > new_req->rq_timeout)
1667                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1668         else
1669                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1670         new_req->rq_generation_set = 1;
1671         new_req->rq_import_generation = request->rq_import_generation;
1672
1673         new_aa = ptlrpc_req_async_args(new_req);
1674
1675         INIT_LIST_HEAD(&new_aa->aa_oaps);
1676         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1677         INIT_LIST_HEAD(&new_aa->aa_exts);
1678         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1679         new_aa->aa_resends = aa->aa_resends;
1680
1681         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1682                 if (oap->oap_request) {
1683                         ptlrpc_req_finished(oap->oap_request);
1684                         oap->oap_request = ptlrpc_request_addref(new_req);
1685                 }
1686         }
1687
1688         new_aa->aa_ocapa = aa->aa_ocapa;
1689         aa->aa_ocapa = NULL;
1690
1691         /* XXX: This code will run into problem if we're going to support
1692          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1693          * and wait for all of them to be finished. We should inherit request
1694          * set from old request. */
1695         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1696
1697         DEBUG_REQ(D_INFO, new_req, "new request");
1698         RETURN(0);
1699 }
1700
1701 /*
1702  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1703  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1704  * fine for our small page arrays and doesn't require allocation.  its an
1705  * insertion sort that swaps elements that are strides apart, shrinking the
1706  * stride down until its '1' and the array is sorted.
1707  */
1708 static void sort_brw_pages(struct brw_page **array, int num)
1709 {
1710         int stride, i, j;
1711         struct brw_page *tmp;
1712
1713         if (num == 1)
1714                 return;
1715         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1716                 ;
1717
1718         do {
1719                 stride /= 3;
1720                 for (i = stride ; i < num ; i++) {
1721                         tmp = array[i];
1722                         j = i;
1723                         while (j >= stride && array[j - stride]->off > tmp->off) {
1724                                 array[j] = array[j - stride];
1725                                 j -= stride;
1726                         }
1727                         array[j] = tmp;
1728                 }
1729         } while (stride > 1);
1730 }
1731
1732 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1733 {
1734         LASSERT(ppga != NULL);
1735         OBD_FREE(ppga, sizeof(*ppga) * count);
1736 }
1737
1738 static int brw_interpret(const struct lu_env *env,
1739                          struct ptlrpc_request *req, void *data, int rc)
1740 {
1741         struct osc_brw_async_args *aa = data;
1742         struct osc_extent *ext;
1743         struct osc_extent *tmp;
1744         struct client_obd *cli = aa->aa_cli;
1745         ENTRY;
1746
1747         rc = osc_brw_fini_request(req, rc);
1748         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1749         /* When server return -EINPROGRESS, client should always retry
1750          * regardless of the number of times the bulk was resent already. */
1751         if (osc_recoverable_error(rc)) {
1752                 if (req->rq_import_generation !=
1753                     req->rq_import->imp_generation) {
1754                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1755                                ""DOSTID", rc = %d.\n",
1756                                req->rq_import->imp_obd->obd_name,
1757                                POSTID(&aa->aa_oa->o_oi), rc);
1758                 } else if (rc == -EINPROGRESS ||
1759                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1760                         rc = osc_brw_redo_request(req, aa, rc);
1761                 } else {
1762                         CERROR("%s: too many resent retries for object: "
1763                                ""LPU64":"LPU64", rc = %d.\n",
1764                                req->rq_import->imp_obd->obd_name,
1765                                POSTID(&aa->aa_oa->o_oi), rc);
1766                 }
1767
1768                 if (rc == 0)
1769                         RETURN(0);
1770                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1771                         rc = -EIO;
1772         }
1773
1774         if (aa->aa_ocapa) {
1775                 capa_put(aa->aa_ocapa);
1776                 aa->aa_ocapa = NULL;
1777         }
1778
1779         if (rc == 0) {
1780                 struct obdo *oa = aa->aa_oa;
1781                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1782                 unsigned long valid = 0;
1783                 struct cl_object *obj;
1784                 struct osc_async_page *last;
1785
1786                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1787                 obj = osc2cl(last->oap_obj);
1788
1789                 cl_object_attr_lock(obj);
1790                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1791                         attr->cat_blocks = oa->o_blocks;
1792                         valid |= CAT_BLOCKS;
1793                 }
1794                 if (oa->o_valid & OBD_MD_FLMTIME) {
1795                         attr->cat_mtime = oa->o_mtime;
1796                         valid |= CAT_MTIME;
1797                 }
1798                 if (oa->o_valid & OBD_MD_FLATIME) {
1799                         attr->cat_atime = oa->o_atime;
1800                         valid |= CAT_ATIME;
1801                 }
1802                 if (oa->o_valid & OBD_MD_FLCTIME) {
1803                         attr->cat_ctime = oa->o_ctime;
1804                         valid |= CAT_CTIME;
1805                 }
1806
1807                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1808                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1809                         loff_t last_off = last->oap_count + last->oap_obj_off +
1810                                 last->oap_page_off;
1811
1812                         /* Change file size if this is an out of quota or
1813                          * direct IO write and it extends the file size */
1814                         if (loi->loi_lvb.lvb_size < last_off) {
1815                                 attr->cat_size = last_off;
1816                                 valid |= CAT_SIZE;
1817                         }
1818                         /* Extend KMS if it's not a lockless write */
1819                         if (loi->loi_kms < last_off &&
1820                             oap2osc_page(last)->ops_srvlock == 0) {
1821                                 attr->cat_kms = last_off;
1822                                 valid |= CAT_KMS;
1823                         }
1824                 }
1825
1826                 if (valid != 0)
1827                         cl_object_attr_set(env, obj, attr, valid);
1828                 cl_object_attr_unlock(obj);
1829         }
1830         OBDO_FREE(aa->aa_oa);
1831
1832         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1833                 osc_inc_unstable_pages(req);
1834
1835         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1836                 list_del_init(&ext->oe_link);
1837                 osc_extent_finish(env, ext, 1, rc);
1838         }
1839         LASSERT(list_empty(&aa->aa_exts));
1840         LASSERT(list_empty(&aa->aa_oaps));
1841
1842         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1843                           req->rq_bulk->bd_nob_transferred);
1844         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1845         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1846
1847         client_obd_list_lock(&cli->cl_loi_list_lock);
1848         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1849          * is called so we know whether to go to sync BRWs or wait for more
1850          * RPCs to complete */
1851         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1852                 cli->cl_w_in_flight--;
1853         else
1854                 cli->cl_r_in_flight--;
1855         osc_wake_cache_waiters(cli);
1856         client_obd_list_unlock(&cli->cl_loi_list_lock);
1857
1858         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1859         RETURN(rc);
1860 }
1861
1862 static void brw_commit(struct ptlrpc_request *req)
1863 {
1864         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1865          * this called via the rq_commit_cb, I need to ensure
1866          * osc_dec_unstable_pages is still called. Otherwise unstable
1867          * pages may be leaked. */
1868         spin_lock(&req->rq_lock);
1869         if (likely(req->rq_unstable)) {
1870                 req->rq_unstable = 0;
1871                 spin_unlock(&req->rq_lock);
1872
1873                 osc_dec_unstable_pages(req);
1874         } else {
1875                 req->rq_committed = 1;
1876                 spin_unlock(&req->rq_lock);
1877         }
1878 }
1879
1880 /**
1881  * Build an RPC by the list of extent @ext_list. The caller must ensure
1882  * that the total pages in this list are NOT over max pages per RPC.
1883  * Extents in the list must be in OES_RPC state.
1884  */
1885 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1886                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1887 {
1888         struct ptlrpc_request           *req = NULL;
1889         struct osc_extent               *ext;
1890         struct brw_page                 **pga = NULL;
1891         struct osc_brw_async_args       *aa = NULL;
1892         struct obdo                     *oa = NULL;
1893         struct osc_async_page           *oap;
1894         struct osc_async_page           *tmp;
1895         struct cl_req                   *clerq = NULL;
1896         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1897                                                                       CRT_READ;
1898         struct cl_req_attr              *crattr = NULL;
1899         obd_off                         starting_offset = OBD_OBJECT_EOF;
1900         obd_off                         ending_offset = 0;
1901         int                             mpflag = 0;
1902         int                             mem_tight = 0;
1903         int                             page_count = 0;
1904         bool                            soft_sync = false;
1905         int                             i;
1906         int                             rc;
1907         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1908
1909         ENTRY;
1910         LASSERT(!list_empty(ext_list));
1911
1912         /* add pages into rpc_list to build BRW rpc */
1913         list_for_each_entry(ext, ext_list, oe_link) {
1914                 LASSERT(ext->oe_state == OES_RPC);
1915                 mem_tight |= ext->oe_memalloc;
1916                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1917                         ++page_count;
1918                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1919                         if (starting_offset > oap->oap_obj_off)
1920                                 starting_offset = oap->oap_obj_off;
1921                         else
1922                                 LASSERT(oap->oap_page_off == 0);
1923                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1924                                 ending_offset = oap->oap_obj_off +
1925                                                 oap->oap_count;
1926                         else
1927                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1928                                         PAGE_CACHE_SIZE);
1929                 }
1930         }
1931
1932         soft_sync = osc_over_unstable_soft_limit(cli);
1933         if (mem_tight)
1934                 mpflag = cfs_memory_pressure_get_and_set();
1935
1936         OBD_ALLOC(crattr, sizeof(*crattr));
1937         if (crattr == NULL)
1938                 GOTO(out, rc = -ENOMEM);
1939
1940         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1941         if (pga == NULL)
1942                 GOTO(out, rc = -ENOMEM);
1943
1944         OBDO_ALLOC(oa);
1945         if (oa == NULL)
1946                 GOTO(out, rc = -ENOMEM);
1947
1948         i = 0;
1949         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1950                 struct cl_page *page = oap2cl_page(oap);
1951                 if (clerq == NULL) {
1952                         clerq = cl_req_alloc(env, page, crt,
1953                                              1 /* only 1-object rpcs for now */);
1954                         if (IS_ERR(clerq))
1955                                 GOTO(out, rc = PTR_ERR(clerq));
1956                 }
1957                 if (mem_tight)
1958                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1959                 if (soft_sync)
1960                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1961                 pga[i] = &oap->oap_brw_page;
1962                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1963                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1964                        pga[i]->pg, page_index(oap->oap_page), oap,
1965                        pga[i]->flag);
1966                 i++;
1967                 cl_req_page_add(env, clerq, page);
1968         }
1969
1970         /* always get the data for the obdo for the rpc */
1971         LASSERT(clerq != NULL);
1972         crattr->cra_oa = oa;
1973         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1974
1975         rc = cl_req_prep(env, clerq);
1976         if (rc != 0) {
1977                 CERROR("cl_req_prep failed: %d\n", rc);
1978                 GOTO(out, rc);
1979         }
1980
1981         sort_brw_pages(pga, page_count);
1982         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1983                         pga, &req, crattr->cra_capa, 1, 0);
1984         if (rc != 0) {
1985                 CERROR("prep_req failed: %d\n", rc);
1986                 GOTO(out, rc);
1987         }
1988
1989         req->rq_commit_cb = brw_commit;
1990         req->rq_interpret_reply = brw_interpret;
1991
1992         if (mem_tight != 0)
1993                 req->rq_memalloc = 1;
1994
1995         /* Need to update the timestamps after the request is built in case
1996          * we race with setattr (locally or in queue at OST).  If OST gets
1997          * later setattr before earlier BRW (as determined by the request xid),
1998          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1999          * way to do this in a single call.  bug 10150 */
2000         cl_req_attr_set(env, clerq, crattr,
2001                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2002
2003         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2004
2005         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2006         aa = ptlrpc_req_async_args(req);
2007         INIT_LIST_HEAD(&aa->aa_oaps);
2008         list_splice_init(&rpc_list, &aa->aa_oaps);
2009         INIT_LIST_HEAD(&aa->aa_exts);
2010         list_splice_init(ext_list, &aa->aa_exts);
2011         aa->aa_clerq = clerq;
2012
2013         /* queued sync pages can be torn down while the pages
2014          * were between the pending list and the rpc */
2015         tmp = NULL;
2016         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2017                 /* only one oap gets a request reference */
2018                 if (tmp == NULL)
2019                         tmp = oap;
2020                 if (oap->oap_interrupted && !req->rq_intr) {
2021                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2022                                         oap, req);
2023                         ptlrpc_mark_interrupted(req);
2024                 }
2025         }
2026         if (tmp != NULL)
2027                 tmp->oap_request = ptlrpc_request_addref(req);
2028
2029         client_obd_list_lock(&cli->cl_loi_list_lock);
2030         starting_offset >>= PAGE_CACHE_SHIFT;
2031         if (cmd == OBD_BRW_READ) {
2032                 cli->cl_r_in_flight++;
2033                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2034                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2035                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2036                                       starting_offset + 1);
2037         } else {
2038                 cli->cl_w_in_flight++;
2039                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2040                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2041                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2042                                       starting_offset + 1);
2043         }
2044         client_obd_list_unlock(&cli->cl_loi_list_lock);
2045
2046         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2047                   page_count, aa, cli->cl_r_in_flight,
2048                   cli->cl_w_in_flight);
2049
2050         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2051          * see which CPU/NUMA node the majority of pages were allocated
2052          * on, and try to assign the async RPC to the CPU core
2053          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2054          *
2055          * But on the other hand, we expect that multiple ptlrpcd
2056          * threads and the initial write sponsor can run in parallel,
2057          * especially when data checksum is enabled, which is CPU-bound
2058          * operation and single ptlrpcd thread cannot process in time.
2059          * So more ptlrpcd threads sharing BRW load
2060          * (with PDL_POLICY_ROUND) seems better.
2061          */
2062         ptlrpcd_add_req(req, pol, -1);
2063         rc = 0;
2064         EXIT;
2065
2066 out:
2067         if (mem_tight != 0)
2068                 cfs_memory_pressure_restore(mpflag);
2069
2070         if (crattr != NULL) {
2071                 capa_put(crattr->cra_capa);
2072                 OBD_FREE(crattr, sizeof(*crattr));
2073         }
2074
2075         if (rc != 0) {
2076                 LASSERT(req == NULL);
2077
2078                 if (oa)
2079                         OBDO_FREE(oa);
2080                 if (pga)
2081                         OBD_FREE(pga, sizeof(*pga) * page_count);
2082                 /* this should happen rarely and is pretty bad, it makes the
2083                  * pending list not follow the dirty order */
2084                 while (!list_empty(ext_list)) {
2085                         ext = list_entry(ext_list->next, struct osc_extent,
2086                                          oe_link);
2087                         list_del_init(&ext->oe_link);
2088                         osc_extent_finish(env, ext, 0, rc);
2089                 }
2090                 if (clerq && !IS_ERR(clerq))
2091                         cl_req_completion(env, clerq, rc);
2092         }
2093         RETURN(rc);
2094 }
2095
2096 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2097                                         struct ldlm_enqueue_info *einfo)
2098 {
2099         void *data = einfo->ei_cbdata;
2100         int set = 0;
2101
2102         LASSERT(lock != NULL);
2103         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2104         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2105         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2106         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2107
2108         lock_res_and_lock(lock);
2109
2110         if (lock->l_ast_data == NULL)
2111                 lock->l_ast_data = data;
2112         if (lock->l_ast_data == data)
2113                 set = 1;
2114
2115         unlock_res_and_lock(lock);
2116
2117         return set;
2118 }
2119
2120 static int osc_set_data_with_check(struct lustre_handle *lockh,
2121                                    struct ldlm_enqueue_info *einfo)
2122 {
2123         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2124         int set = 0;
2125
2126         if (lock != NULL) {
2127                 set = osc_set_lock_data_with_check(lock, einfo);
2128                 LDLM_LOCK_PUT(lock);
2129         } else
2130                 CERROR("lockh %p, data %p - client evicted?\n",
2131                        lockh, einfo->ei_cbdata);
2132         return set;
2133 }
2134
2135 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2136                              ldlm_iterator_t replace, void *data)
2137 {
2138         struct ldlm_res_id res_id;
2139         struct obd_device *obd = class_exp2obd(exp);
2140
2141         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2142         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2143         return 0;
2144 }
2145
2146 /* find any ldlm lock of the inode in osc
2147  * return 0    not find
2148  *        1    find one
2149  *      < 0    error */
2150 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2151                            ldlm_iterator_t replace, void *data)
2152 {
2153         struct ldlm_res_id res_id;
2154         struct obd_device *obd = class_exp2obd(exp);
2155         int rc = 0;
2156
2157         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2158         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2159         if (rc == LDLM_ITER_STOP)
2160                 return(1);
2161         if (rc == LDLM_ITER_CONTINUE)
2162                 return(0);
2163         return(rc);
2164 }
2165
2166 static int osc_enqueue_fini(struct ptlrpc_request *req,
2167                             osc_enqueue_upcall_f upcall, void *cookie,
2168                             struct lustre_handle *lockh, ldlm_mode_t mode,
2169                             __u64 *flags, int agl, int errcode)
2170 {
2171         bool intent = *flags & LDLM_FL_HAS_INTENT;
2172         int rc;
2173         ENTRY;
2174
2175         /* The request was created before ldlm_cli_enqueue call. */
2176         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2177                 struct ldlm_reply *rep;
2178
2179                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2180                 LASSERT(rep != NULL);
2181
2182                 rep->lock_policy_res1 =
2183                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2184                 if (rep->lock_policy_res1)
2185                         errcode = rep->lock_policy_res1;
2186                 if (!agl)
2187                         *flags |= LDLM_FL_LVB_READY;
2188         } else if (errcode == ELDLM_OK) {
2189                 *flags |= LDLM_FL_LVB_READY;
2190         }
2191
2192         /* Call the update callback. */
2193         rc = (*upcall)(cookie, lockh, errcode);
2194
2195         /* release the reference taken in ldlm_cli_enqueue() */
2196         if (errcode == ELDLM_LOCK_MATCHED)
2197                 errcode = ELDLM_OK;
2198         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2199                 ldlm_lock_decref(lockh, mode);
2200
2201         RETURN(rc);
2202 }
2203
2204 static int osc_enqueue_interpret(const struct lu_env *env,
2205                                  struct ptlrpc_request *req,
2206                                  struct osc_enqueue_args *aa, int rc)
2207 {
2208         struct ldlm_lock *lock;
2209         struct lustre_handle *lockh = &aa->oa_lockh;
2210         ldlm_mode_t mode = aa->oa_mode;
2211         struct ost_lvb *lvb = aa->oa_lvb;
2212         __u32 lvb_len = sizeof(*lvb);
2213         __u64 flags = 0;
2214
2215         ENTRY;
2216
2217         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2218          * be valid. */
2219         lock = ldlm_handle2lock(lockh);
2220         LASSERTF(lock != NULL,
2221                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2222                  lockh->cookie, req, aa);
2223
2224         /* Take an additional reference so that a blocking AST that
2225          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2226          * to arrive after an upcall has been executed by
2227          * osc_enqueue_fini(). */
2228         ldlm_lock_addref(lockh, mode);
2229
2230         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2231         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2232
2233         /* Let CP AST to grant the lock first. */
2234         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2235
2236         if (aa->oa_agl) {
2237                 LASSERT(aa->oa_lvb == NULL);
2238                 LASSERT(aa->oa_flags == NULL);
2239                 aa->oa_flags = &flags;
2240         }
2241
2242         /* Complete obtaining the lock procedure. */
2243         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2244                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2245                                    lockh, rc);
2246         /* Complete osc stuff. */
2247         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2248                               aa->oa_flags, aa->oa_agl, rc);
2249
2250         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2251
2252         ldlm_lock_decref(lockh, mode);
2253         LDLM_LOCK_PUT(lock);
2254         RETURN(rc);
2255 }
2256
2257 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2258
2259 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2260  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2261  * other synchronous requests, however keeping some locks and trying to obtain
2262  * others may take a considerable amount of time in a case of ost failure; and
2263  * when other sync requests do not get released lock from a client, the client
2264  * is excluded from the cluster -- such scenarious make the life difficult, so
2265  * release locks just after they are obtained. */
2266 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2267                      __u64 *flags, ldlm_policy_data_t *policy,
2268                      struct ost_lvb *lvb, int kms_valid,
2269                      osc_enqueue_upcall_f upcall, void *cookie,
2270                      struct ldlm_enqueue_info *einfo,
2271                      struct ptlrpc_request_set *rqset, int async, int agl)
2272 {
2273         struct obd_device *obd = exp->exp_obd;
2274         struct lustre_handle lockh = { 0 };
2275         struct ptlrpc_request *req = NULL;
2276         int intent = *flags & LDLM_FL_HAS_INTENT;
2277         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2278         ldlm_mode_t mode;
2279         int rc;
2280         ENTRY;
2281
2282         /* Filesystem lock extents are extended to page boundaries so that
2283          * dealing with the page cache is a little smoother.  */
2284         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2285         policy->l_extent.end |= ~CFS_PAGE_MASK;
2286
2287         /*
2288          * kms is not valid when either object is completely fresh (so that no
2289          * locks are cached), or object was evicted. In the latter case cached
2290          * lock cannot be used, because it would prime inode state with
2291          * potentially stale LVB.
2292          */
2293         if (!kms_valid)
2294                 goto no_match;
2295
2296         /* Next, search for already existing extent locks that will cover us */
2297         /* If we're trying to read, we also search for an existing PW lock.  The
2298          * VFS and page cache already protect us locally, so lots of readers/
2299          * writers can share a single PW lock.
2300          *
2301          * There are problems with conversion deadlocks, so instead of
2302          * converting a read lock to a write lock, we'll just enqueue a new
2303          * one.
2304          *
2305          * At some point we should cancel the read lock instead of making them
2306          * send us a blocking callback, but there are problems with canceling
2307          * locks out from other users right now, too. */
2308         mode = einfo->ei_mode;
2309         if (einfo->ei_mode == LCK_PR)
2310                 mode |= LCK_PW;
2311         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2312                                einfo->ei_type, policy, mode, &lockh, 0);
2313         if (mode) {
2314                 struct ldlm_lock *matched;
2315
2316                 if (*flags & LDLM_FL_TEST_LOCK)
2317                         RETURN(ELDLM_OK);
2318
2319                 matched = ldlm_handle2lock(&lockh);
2320                 if (agl) {
2321                         /* For AGL, if there already exists a matched lock,
2322                          * return earlier and inform the caller. */
2323                         ldlm_lock_decref(&lockh, mode);
2324                         LDLM_LOCK_PUT(matched);
2325                         RETURN(-ECANCELED);
2326                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2327                         *flags |= LDLM_FL_LVB_READY;
2328
2329                         /* We already have a lock, and it's referenced. */
2330                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2331
2332                         ldlm_lock_decref(&lockh, mode);
2333                         LDLM_LOCK_PUT(matched);
2334                         RETURN(ELDLM_OK);
2335                 } else {
2336                         ldlm_lock_decref(&lockh, mode);
2337                         LDLM_LOCK_PUT(matched);
2338                 }
2339         }
2340
2341 no_match:
2342         if (*flags & LDLM_FL_TEST_LOCK)
2343                 RETURN(-ENOLCK);
2344
2345         if (intent) {
2346                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2347                                            &RQF_LDLM_ENQUEUE_LVB);
2348                 if (req == NULL)
2349                         RETURN(-ENOMEM);
2350
2351                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2352                 if (rc < 0) {
2353                         ptlrpc_request_free(req);
2354                         RETURN(rc);
2355                 }
2356
2357                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2358                                      sizeof *lvb);
2359                 ptlrpc_request_set_replen(req);
2360         }
2361
2362         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2363         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2364
2365         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2366                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2367         if (async) {
2368                 if (!rc) {
2369                         struct osc_enqueue_args *aa;
2370                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2371                         aa = ptlrpc_req_async_args(req);
2372                         aa->oa_exp    = exp;
2373                         aa->oa_mode   = einfo->ei_mode;
2374                         aa->oa_type   = einfo->ei_type;
2375                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2376                         aa->oa_upcall = upcall;
2377                         aa->oa_cookie = cookie;
2378                         aa->oa_agl    = !!agl;
2379                         if (!agl) {
2380                                 aa->oa_flags  = flags;
2381                                 aa->oa_lvb    = lvb;
2382                         } else {
2383                                 /* AGL is essentially to enqueue an DLM lock
2384                                  * in advance, so we don't care about the
2385                                  * result of AGL enqueue. */
2386                                 aa->oa_lvb    = NULL;
2387                                 aa->oa_flags  = NULL;
2388                         }
2389
2390                         req->rq_interpret_reply =
2391                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2392                         if (rqset == PTLRPCD_SET)
2393                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2394                         else
2395                                 ptlrpc_set_add_req(rqset, req);
2396                 } else if (intent) {
2397                         ptlrpc_req_finished(req);
2398                 }
2399                 RETURN(rc);
2400         }
2401
2402         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2403                               flags, agl, rc);
2404         if (intent)
2405                 ptlrpc_req_finished(req);
2406
2407         RETURN(rc);
2408 }
2409
2410 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2411                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2412                    __u64 *flags, void *data, struct lustre_handle *lockh,
2413                    int unref)
2414 {
2415         struct obd_device *obd = exp->exp_obd;
2416         __u64 lflags = *flags;
2417         ldlm_mode_t rc;
2418         ENTRY;
2419
2420         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2421                 RETURN(-EIO);
2422
2423         /* Filesystem lock extents are extended to page boundaries so that
2424          * dealing with the page cache is a little smoother */
2425         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2426         policy->l_extent.end |= ~CFS_PAGE_MASK;
2427
2428         /* Next, search for already existing extent locks that will cover us */
2429         /* If we're trying to read, we also search for an existing PW lock.  The
2430          * VFS and page cache already protect us locally, so lots of readers/
2431          * writers can share a single PW lock. */
2432         rc = mode;
2433         if (mode == LCK_PR)
2434                 rc |= LCK_PW;
2435         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2436                              res_id, type, policy, rc, lockh, unref);
2437         if (rc) {
2438                 if (data != NULL) {
2439                         if (!osc_set_data_with_check(lockh, data)) {
2440                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2441                                         ldlm_lock_decref(lockh, rc);
2442                                 RETURN(0);
2443                         }
2444                 }
2445                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2446                         ldlm_lock_addref(lockh, LCK_PR);
2447                         ldlm_lock_decref(lockh, LCK_PW);
2448                 }
2449                 RETURN(rc);
2450         }
2451         RETURN(rc);
2452 }
2453
2454 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2455 {
2456         ENTRY;
2457
2458         if (unlikely(mode == LCK_GROUP))
2459                 ldlm_lock_decref_and_cancel(lockh, mode);
2460         else
2461                 ldlm_lock_decref(lockh, mode);
2462
2463         RETURN(0);
2464 }
2465
2466 static int osc_statfs_interpret(const struct lu_env *env,
2467                                 struct ptlrpc_request *req,
2468                                 struct osc_async_args *aa, int rc)
2469 {
2470         struct obd_statfs *msfs;
2471         ENTRY;
2472
2473         if (rc == -EBADR)
2474                 /* The request has in fact never been sent
2475                  * due to issues at a higher level (LOV).
2476                  * Exit immediately since the caller is
2477                  * aware of the problem and takes care
2478                  * of the clean up */
2479                  RETURN(rc);
2480
2481         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2482             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2483                 GOTO(out, rc = 0);
2484
2485         if (rc != 0)
2486                 GOTO(out, rc);
2487
2488         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2489         if (msfs == NULL) {
2490                 GOTO(out, rc = -EPROTO);
2491         }
2492
2493         *aa->aa_oi->oi_osfs = *msfs;
2494 out:
2495         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2496         RETURN(rc);
2497 }
2498
2499 static int osc_statfs_async(struct obd_export *exp,
2500                             struct obd_info *oinfo, __u64 max_age,
2501                             struct ptlrpc_request_set *rqset)
2502 {
2503         struct obd_device     *obd = class_exp2obd(exp);
2504         struct ptlrpc_request *req;
2505         struct osc_async_args *aa;
2506         int                    rc;
2507         ENTRY;
2508
2509         /* We could possibly pass max_age in the request (as an absolute
2510          * timestamp or a "seconds.usec ago") so the target can avoid doing
2511          * extra calls into the filesystem if that isn't necessary (e.g.
2512          * during mount that would help a bit).  Having relative timestamps
2513          * is not so great if request processing is slow, while absolute
2514          * timestamps are not ideal because they need time synchronization. */
2515         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2516         if (req == NULL)
2517                 RETURN(-ENOMEM);
2518
2519         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2520         if (rc) {
2521                 ptlrpc_request_free(req);
2522                 RETURN(rc);
2523         }
2524         ptlrpc_request_set_replen(req);
2525         req->rq_request_portal = OST_CREATE_PORTAL;
2526         ptlrpc_at_set_req_timeout(req);
2527
2528         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2529                 /* procfs requests not want stat in wait for avoid deadlock */
2530                 req->rq_no_resend = 1;
2531                 req->rq_no_delay = 1;
2532         }
2533
2534         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2535         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2536         aa = ptlrpc_req_async_args(req);
2537         aa->aa_oi = oinfo;
2538
2539         ptlrpc_set_add_req(rqset, req);
2540         RETURN(0);
2541 }
2542
2543 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2544                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2545 {
2546         struct obd_device     *obd = class_exp2obd(exp);
2547         struct obd_statfs     *msfs;
2548         struct ptlrpc_request *req;
2549         struct obd_import     *imp = NULL;
2550         int rc;
2551         ENTRY;
2552
2553         /*Since the request might also come from lprocfs, so we need
2554          *sync this with client_disconnect_export Bug15684*/
2555         down_read(&obd->u.cli.cl_sem);
2556         if (obd->u.cli.cl_import)
2557                 imp = class_import_get(obd->u.cli.cl_import);
2558         up_read(&obd->u.cli.cl_sem);
2559         if (!imp)
2560                 RETURN(-ENODEV);
2561
2562         /* We could possibly pass max_age in the request (as an absolute
2563          * timestamp or a "seconds.usec ago") so the target can avoid doing
2564          * extra calls into the filesystem if that isn't necessary (e.g.
2565          * during mount that would help a bit).  Having relative timestamps
2566          * is not so great if request processing is slow, while absolute
2567          * timestamps are not ideal because they need time synchronization. */
2568         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2569
2570         class_import_put(imp);
2571
2572         if (req == NULL)
2573                 RETURN(-ENOMEM);
2574
2575         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2576         if (rc) {
2577                 ptlrpc_request_free(req);
2578                 RETURN(rc);
2579         }
2580         ptlrpc_request_set_replen(req);
2581         req->rq_request_portal = OST_CREATE_PORTAL;
2582         ptlrpc_at_set_req_timeout(req);
2583
2584         if (flags & OBD_STATFS_NODELAY) {
2585                 /* procfs requests not want stat in wait for avoid deadlock */
2586                 req->rq_no_resend = 1;
2587                 req->rq_no_delay = 1;
2588         }
2589
2590         rc = ptlrpc_queue_wait(req);
2591         if (rc)
2592                 GOTO(out, rc);
2593
2594         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2595         if (msfs == NULL) {
2596                 GOTO(out, rc = -EPROTO);
2597         }
2598
2599         *osfs = *msfs;
2600
2601         EXIT;
2602  out:
2603         ptlrpc_req_finished(req);
2604         return rc;
2605 }
2606
2607 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2608                          void *karg, void *uarg)
2609 {
2610         struct obd_device *obd = exp->exp_obd;
2611         struct obd_ioctl_data *data = karg;
2612         int err = 0;
2613         ENTRY;
2614
2615         if (!try_module_get(THIS_MODULE)) {
2616                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2617                        module_name(THIS_MODULE));
2618                 return -EINVAL;
2619         }
2620         switch (cmd) {
2621         case OBD_IOC_CLIENT_RECOVER:
2622                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2623                                             data->ioc_inlbuf1, 0);
2624                 if (err > 0)
2625                         err = 0;
2626                 GOTO(out, err);
2627         case IOC_OSC_SET_ACTIVE:
2628                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2629                                                data->ioc_offset);
2630                 GOTO(out, err);
2631         case OBD_IOC_POLL_QUOTACHECK:
2632                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2633                 GOTO(out, err);
2634         case OBD_IOC_PING_TARGET:
2635                 err = ptlrpc_obd_ping(obd);
2636                 GOTO(out, err);
2637         default:
2638                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2639                        cmd, current_comm());
2640                 GOTO(out, err = -ENOTTY);
2641         }
2642 out:
2643         module_put(THIS_MODULE);
2644         return err;
2645 }
2646
2647 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2648                         obd_count keylen, void *key, __u32 *vallen, void *val,
2649                         struct lov_stripe_md *lsm)
2650 {
2651         ENTRY;
2652         if (!vallen || !val)
2653                 RETURN(-EFAULT);
2654
2655         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2656                 __u32 *stripe = val;
2657                 *vallen = sizeof(*stripe);
2658                 *stripe = 0;
2659                 RETURN(0);
2660         } else if (KEY_IS(KEY_LAST_ID)) {
2661                 struct ptlrpc_request *req;
2662                 obd_id                *reply;
2663                 char                  *tmp;
2664                 int                    rc;
2665
2666                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2667                                            &RQF_OST_GET_INFO_LAST_ID);
2668                 if (req == NULL)
2669                         RETURN(-ENOMEM);
2670
2671                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2672                                      RCL_CLIENT, keylen);
2673                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2674                 if (rc) {
2675                         ptlrpc_request_free(req);
2676                         RETURN(rc);
2677                 }
2678
2679                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2680                 memcpy(tmp, key, keylen);
2681
2682                 req->rq_no_delay = req->rq_no_resend = 1;
2683                 ptlrpc_request_set_replen(req);
2684                 rc = ptlrpc_queue_wait(req);
2685                 if (rc)
2686                         GOTO(out, rc);
2687
2688                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2689                 if (reply == NULL)
2690                         GOTO(out, rc = -EPROTO);
2691
2692                 *((obd_id *)val) = *reply;
2693         out:
2694                 ptlrpc_req_finished(req);
2695                 RETURN(rc);
2696         } else if (KEY_IS(KEY_FIEMAP)) {
2697                 struct ll_fiemap_info_key *fm_key =
2698                                 (struct ll_fiemap_info_key *)key;
2699                 struct ldlm_res_id       res_id;
2700                 ldlm_policy_data_t       policy;
2701                 struct lustre_handle     lockh;
2702                 ldlm_mode_t              mode = 0;
2703                 struct ptlrpc_request   *req;
2704                 struct ll_user_fiemap   *reply;
2705                 char                    *tmp;
2706                 int                      rc;
2707
2708                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2709                         goto skip_locking;
2710
2711                 policy.l_extent.start = fm_key->fiemap.fm_start &
2712                                                 CFS_PAGE_MASK;
2713
2714                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2715                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2716                         policy.l_extent.end = OBD_OBJECT_EOF;
2717                 else
2718                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2719                                 fm_key->fiemap.fm_length +
2720                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2721
2722                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2723                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2724                                        LDLM_FL_BLOCK_GRANTED |
2725                                        LDLM_FL_LVB_READY,
2726                                        &res_id, LDLM_EXTENT, &policy,
2727                                        LCK_PR | LCK_PW, &lockh, 0);
2728                 if (mode) { /* lock is cached on client */
2729                         if (mode != LCK_PR) {
2730                                 ldlm_lock_addref(&lockh, LCK_PR);
2731                                 ldlm_lock_decref(&lockh, LCK_PW);
2732                         }
2733                 } else { /* no cached lock, needs acquire lock on server side */
2734                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2735                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2736                 }
2737
2738 skip_locking:
2739                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2740                                            &RQF_OST_GET_INFO_FIEMAP);
2741                 if (req == NULL)
2742                         GOTO(drop_lock, rc = -ENOMEM);
2743
2744                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2745                                      RCL_CLIENT, keylen);
2746                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2747                                      RCL_CLIENT, *vallen);
2748                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2749                                      RCL_SERVER, *vallen);
2750
2751                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2752                 if (rc) {
2753                         ptlrpc_request_free(req);
2754                         GOTO(drop_lock, rc);
2755                 }
2756
2757                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2758                 memcpy(tmp, key, keylen);
2759                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2760                 memcpy(tmp, val, *vallen);
2761
2762                 ptlrpc_request_set_replen(req);
2763                 rc = ptlrpc_queue_wait(req);
2764                 if (rc)
2765                         GOTO(fini_req, rc);
2766
2767                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2768                 if (reply == NULL)
2769                         GOTO(fini_req, rc = -EPROTO);
2770
2771                 memcpy(val, reply, *vallen);
2772 fini_req:
2773                 ptlrpc_req_finished(req);
2774 drop_lock:
2775                 if (mode)
2776                         ldlm_lock_decref(&lockh, LCK_PR);
2777                 RETURN(rc);
2778         }
2779
2780         RETURN(-EINVAL);
2781 }
2782
2783 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2784                               obd_count keylen, void *key, obd_count vallen,
2785                               void *val, struct ptlrpc_request_set *set)
2786 {
2787         struct ptlrpc_request *req;
2788         struct obd_device     *obd = exp->exp_obd;
2789         struct obd_import     *imp = class_exp2cliimp(exp);
2790         char                  *tmp;
2791         int                    rc;
2792         ENTRY;
2793
2794         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2795
2796         if (KEY_IS(KEY_CHECKSUM)) {
2797                 if (vallen != sizeof(int))
2798                         RETURN(-EINVAL);
2799                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2800                 RETURN(0);
2801         }
2802
2803         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2804                 sptlrpc_conf_client_adapt(obd);
2805                 RETURN(0);
2806         }
2807
2808         if (KEY_IS(KEY_FLUSH_CTX)) {
2809                 sptlrpc_import_flush_my_ctx(imp);
2810                 RETURN(0);
2811         }
2812
2813         if (KEY_IS(KEY_CACHE_SET)) {
2814                 struct client_obd *cli = &obd->u.cli;
2815
2816                 LASSERT(cli->cl_cache == NULL); /* only once */
2817                 cli->cl_cache = (struct cl_client_cache *)val;
2818                 atomic_inc(&cli->cl_cache->ccc_users);
2819                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2820
2821                 /* add this osc into entity list */
2822                 LASSERT(list_empty(&cli->cl_lru_osc));
2823                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2824                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2825                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2826
2827                 RETURN(0);
2828         }
2829
2830         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2831                 struct client_obd *cli = &obd->u.cli;
2832                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2833                 long target = *(long *)val;
2834
2835                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2836                 *(long *)val -= nr;
2837                 RETURN(0);
2838         }
2839
2840         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2841                 RETURN(-EINVAL);
2842
2843         /* We pass all other commands directly to OST. Since nobody calls osc
2844            methods directly and everybody is supposed to go through LOV, we
2845            assume lov checked invalid values for us.
2846            The only recognised values so far are evict_by_nid and mds_conn.
2847            Even if something bad goes through, we'd get a -EINVAL from OST
2848            anyway. */
2849
2850         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2851                                                 &RQF_OST_SET_GRANT_INFO :
2852                                                 &RQF_OBD_SET_INFO);
2853         if (req == NULL)
2854                 RETURN(-ENOMEM);
2855
2856         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2857                              RCL_CLIENT, keylen);
2858         if (!KEY_IS(KEY_GRANT_SHRINK))
2859                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2860                                      RCL_CLIENT, vallen);
2861         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2862         if (rc) {
2863                 ptlrpc_request_free(req);
2864                 RETURN(rc);
2865         }
2866
2867         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2868         memcpy(tmp, key, keylen);
2869         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2870                                                         &RMF_OST_BODY :
2871                                                         &RMF_SETINFO_VAL);
2872         memcpy(tmp, val, vallen);
2873
2874         if (KEY_IS(KEY_GRANT_SHRINK)) {
2875                 struct osc_grant_args *aa;
2876                 struct obdo *oa;
2877
2878                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2879                 aa = ptlrpc_req_async_args(req);
2880                 OBDO_ALLOC(oa);
2881                 if (!oa) {
2882                         ptlrpc_req_finished(req);
2883                         RETURN(-ENOMEM);
2884                 }
2885                 *oa = ((struct ost_body *)val)->oa;
2886                 aa->aa_oa = oa;
2887                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2888         }
2889
2890         ptlrpc_request_set_replen(req);
2891         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2892                 LASSERT(set != NULL);
2893                 ptlrpc_set_add_req(set, req);
2894                 ptlrpc_check_set(NULL, set);
2895         } else
2896                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2897
2898         RETURN(0);
2899 }
2900
2901 static int osc_reconnect(const struct lu_env *env,
2902                          struct obd_export *exp, struct obd_device *obd,
2903                          struct obd_uuid *cluuid,
2904                          struct obd_connect_data *data,
2905                          void *localdata)
2906 {
2907         struct client_obd *cli = &obd->u.cli;
2908
2909         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2910                 long lost_grant;
2911
2912                 client_obd_list_lock(&cli->cl_loi_list_lock);
2913                 data->ocd_grant = (cli->cl_avail_grant +
2914                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2915                                   2 * cli_brw_size(obd);
2916                 lost_grant = cli->cl_lost_grant;
2917                 cli->cl_lost_grant = 0;
2918                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2919
2920                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2921                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2922                        data->ocd_version, data->ocd_grant, lost_grant);
2923         }
2924
2925         RETURN(0);
2926 }
2927
2928 static int osc_disconnect(struct obd_export *exp)
2929 {
2930         struct obd_device *obd = class_exp2obd(exp);
2931         struct llog_ctxt  *ctxt;
2932         int rc;
2933
2934         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
2935         if (ctxt) {
2936                 if (obd->u.cli.cl_conn_count == 1) {
2937                         /* Flush any remaining cancel messages out to the
2938                          * target */
2939                         llog_sync(ctxt, exp, 0);
2940                 }
2941                 llog_ctxt_put(ctxt);
2942         } else {
2943                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
2944                        obd);
2945         }
2946
2947         rc = client_disconnect_export(exp);
2948         /**
2949          * Initially we put del_shrink_grant before disconnect_export, but it
2950          * causes the following problem if setup (connect) and cleanup
2951          * (disconnect) are tangled together.
2952          *      connect p1                     disconnect p2
2953          *   ptlrpc_connect_import
2954          *     ...............               class_manual_cleanup
2955          *                                     osc_disconnect
2956          *                                     del_shrink_grant
2957          *   ptlrpc_connect_interrupt
2958          *     init_grant_shrink
2959          *   add this client to shrink list
2960          *                                      cleanup_osc
2961          * Bang! pinger trigger the shrink.
2962          * So the osc should be disconnected from the shrink list, after we
2963          * are sure the import has been destroyed. BUG18662
2964          */
2965         if (obd->u.cli.cl_import == NULL)
2966                 osc_del_shrink_grant(&obd->u.cli);
2967         return rc;
2968 }
2969
2970 static int osc_import_event(struct obd_device *obd,
2971                             struct obd_import *imp,
2972                             enum obd_import_event event)
2973 {
2974         struct client_obd *cli;
2975         int rc = 0;
2976
2977         ENTRY;
2978         LASSERT(imp->imp_obd == obd);
2979
2980         switch (event) {
2981         case IMP_EVENT_DISCON: {
2982                 cli = &obd->u.cli;
2983                 client_obd_list_lock(&cli->cl_loi_list_lock);
2984                 cli->cl_avail_grant = 0;
2985                 cli->cl_lost_grant = 0;
2986                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2987                 break;
2988         }
2989         case IMP_EVENT_INACTIVE: {
2990                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2991                 break;
2992         }
2993         case IMP_EVENT_INVALIDATE: {
2994                 struct ldlm_namespace *ns = obd->obd_namespace;
2995                 struct lu_env         *env;
2996                 int                    refcheck;
2997
2998                 env = cl_env_get(&refcheck);
2999                 if (!IS_ERR(env)) {
3000                         /* Reset grants */
3001                         cli = &obd->u.cli;
3002                         /* all pages go to failing rpcs due to the invalid
3003                          * import */
3004                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3005
3006                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3007                         cl_env_put(env, &refcheck);
3008                 } else
3009                         rc = PTR_ERR(env);
3010                 break;
3011         }
3012         case IMP_EVENT_ACTIVE: {
3013                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3014                 break;
3015         }
3016         case IMP_EVENT_OCD: {
3017                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3018
3019                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3020                         osc_init_grant(&obd->u.cli, ocd);
3021
3022                 /* See bug 7198 */
3023                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3024                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3025
3026                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3027                 break;
3028         }
3029         case IMP_EVENT_DEACTIVATE: {
3030                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3031                 break;
3032         }
3033         case IMP_EVENT_ACTIVATE: {
3034                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3035                 break;
3036         }
3037         default:
3038                 CERROR("Unknown import event %d\n", event);
3039                 LBUG();
3040         }
3041         RETURN(rc);
3042 }
3043
3044 /**
3045  * Determine whether the lock can be canceled before replaying the lock
3046  * during recovery, see bug16774 for detailed information.
3047  *
3048  * \retval zero the lock can't be canceled
3049  * \retval other ok to cancel
3050  */
3051 static int osc_cancel_weight(struct ldlm_lock *lock)
3052 {
3053         /*
3054          * Cancel all unused and granted extent lock.
3055          */
3056         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3057             lock->l_granted_mode == lock->l_req_mode &&
3058             osc_ldlm_weigh_ast(lock) == 0)
3059                 RETURN(1);
3060
3061         RETURN(0);
3062 }
3063
3064 static int brw_queue_work(const struct lu_env *env, void *data)
3065 {
3066         struct client_obd *cli = data;
3067
3068         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3069
3070         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3071         RETURN(0);
3072 }
3073
3074 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3075 {
3076         struct client_obd *cli = &obd->u.cli;
3077         struct obd_type   *type;
3078         void              *handler;
3079         int                rc;
3080         ENTRY;
3081
3082         rc = ptlrpcd_addref();
3083         if (rc)
3084                 RETURN(rc);
3085
3086         rc = client_obd_setup(obd, lcfg);
3087         if (rc)
3088                 GOTO(out_ptlrpcd, rc);
3089
3090         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3091         if (IS_ERR(handler))
3092                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3093         cli->cl_writeback_work = handler;
3094
3095         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3096         if (IS_ERR(handler))
3097                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3098         cli->cl_lru_work = handler;
3099
3100         rc = osc_quota_setup(obd);
3101         if (rc)
3102                 GOTO(out_ptlrpcd_work, rc);
3103
3104         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3105
3106 #ifdef LPROCFS
3107         obd->obd_vars = lprocfs_osc_obd_vars;
3108 #endif
3109         /* If this is true then both client (osc) and server (osp) are on the
3110          * same node. The osp layer if loaded first will register the osc proc
3111          * directory. In that case this obd_device will be attached its proc
3112          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3113         type = class_search_type(LUSTRE_OSP_NAME);
3114         if (type && type->typ_procsym) {
3115                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3116                                                            type->typ_procsym,
3117                                                            obd->obd_vars, obd);
3118                 if (IS_ERR(obd->obd_proc_entry)) {
3119                         rc = PTR_ERR(obd->obd_proc_entry);
3120                         CERROR("error %d setting up lprocfs for %s\n", rc,
3121                                obd->obd_name);
3122                         obd->obd_proc_entry = NULL;
3123                 }
3124         } else {
3125                 rc = lprocfs_obd_setup(obd);
3126         }
3127
3128         /* If the basic OSC proc tree construction succeeded then
3129          * lets do the rest. */
3130         if (rc == 0) {
3131                 lproc_osc_attach_seqstat(obd);
3132                 sptlrpc_lprocfs_cliobd_attach(obd);
3133                 ptlrpc_lprocfs_register_obd(obd);
3134         }
3135
3136         /* We need to allocate a few requests more, because
3137          * brw_interpret tries to create new requests before freeing
3138          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3139          * reserved, but I'm afraid that might be too much wasted RAM
3140          * in fact, so 2 is just my guess and still should work. */
3141         cli->cl_import->imp_rq_pool =
3142                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3143                                     OST_MAXREQSIZE,
3144                                     ptlrpc_add_rqs_to_pool);
3145
3146         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3147         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3148         RETURN(0);
3149
3150 out_ptlrpcd_work:
3151         if (cli->cl_writeback_work != NULL) {
3152                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3153                 cli->cl_writeback_work = NULL;
3154         }
3155         if (cli->cl_lru_work != NULL) {
3156                 ptlrpcd_destroy_work(cli->cl_lru_work);
3157                 cli->cl_lru_work = NULL;
3158         }
3159 out_client_setup:
3160         client_obd_cleanup(obd);
3161 out_ptlrpcd:
3162         ptlrpcd_decref();
3163         RETURN(rc);
3164 }
3165
3166 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3167 {
3168         int rc = 0;
3169         ENTRY;
3170
3171         switch (stage) {
3172         case OBD_CLEANUP_EARLY: {
3173                 struct obd_import *imp;
3174                 imp = obd->u.cli.cl_import;
3175                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3176                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3177                 ptlrpc_deactivate_import(imp);
3178                 spin_lock(&imp->imp_lock);
3179                 imp->imp_pingable = 0;
3180                 spin_unlock(&imp->imp_lock);
3181                 break;
3182         }
3183         case OBD_CLEANUP_EXPORTS: {
3184                 struct client_obd *cli = &obd->u.cli;
3185                 /* LU-464
3186                  * for echo client, export may be on zombie list, wait for
3187                  * zombie thread to cull it, because cli.cl_import will be
3188                  * cleared in client_disconnect_export():
3189                  *   class_export_destroy() -> obd_cleanup() ->
3190                  *   echo_device_free() -> echo_client_cleanup() ->
3191                  *   obd_disconnect() -> osc_disconnect() ->
3192                  *   client_disconnect_export()
3193                  */
3194                 obd_zombie_barrier();
3195                 if (cli->cl_writeback_work) {
3196                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3197                         cli->cl_writeback_work = NULL;
3198                 }
3199                 if (cli->cl_lru_work) {
3200                         ptlrpcd_destroy_work(cli->cl_lru_work);
3201                         cli->cl_lru_work = NULL;
3202                 }
3203                 obd_cleanup_client_import(obd);
3204                 ptlrpc_lprocfs_unregister_obd(obd);
3205                 lprocfs_obd_cleanup(obd);
3206                 rc = obd_llog_finish(obd, 0);
3207                 if (rc != 0)
3208                         CERROR("failed to cleanup llogging subsystems\n");
3209                 break;
3210                 }
3211         }
3212         RETURN(rc);
3213 }
3214
3215 int osc_cleanup(struct obd_device *obd)
3216 {
3217         struct client_obd *cli = &obd->u.cli;
3218         int rc;
3219
3220         ENTRY;
3221
3222         /* lru cleanup */
3223         if (cli->cl_cache != NULL) {
3224                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3225                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3226                 list_del_init(&cli->cl_lru_osc);
3227                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3228                 cli->cl_lru_left = NULL;
3229                 atomic_dec(&cli->cl_cache->ccc_users);
3230                 cli->cl_cache = NULL;
3231         }
3232
3233         /* free memory of osc quota cache */
3234         osc_quota_cleanup(obd);
3235
3236         rc = client_obd_cleanup(obd);
3237
3238         ptlrpcd_decref();
3239         RETURN(rc);
3240 }
3241
3242 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3243 {
3244         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3245         return rc > 0 ? 0: rc;
3246 }
3247
3248 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3249 {
3250         return osc_process_config_base(obd, buf);
3251 }
3252
3253 struct obd_ops osc_obd_ops = {
3254         .o_owner                = THIS_MODULE,
3255         .o_setup                = osc_setup,
3256         .o_precleanup           = osc_precleanup,
3257         .o_cleanup              = osc_cleanup,
3258         .o_add_conn             = client_import_add_conn,
3259         .o_del_conn             = client_import_del_conn,
3260         .o_connect              = client_connect_import,
3261         .o_reconnect            = osc_reconnect,
3262         .o_disconnect           = osc_disconnect,
3263         .o_statfs               = osc_statfs,
3264         .o_statfs_async         = osc_statfs_async,
3265         .o_unpackmd             = osc_unpackmd,
3266         .o_create               = osc_create,
3267         .o_destroy              = osc_destroy,
3268         .o_getattr              = osc_getattr,
3269         .o_getattr_async        = osc_getattr_async,
3270         .o_setattr              = osc_setattr,
3271         .o_setattr_async        = osc_setattr_async,
3272         .o_change_cbdata        = osc_change_cbdata,
3273         .o_find_cbdata          = osc_find_cbdata,
3274         .o_iocontrol            = osc_iocontrol,
3275         .o_get_info             = osc_get_info,
3276         .o_set_info_async       = osc_set_info_async,
3277         .o_import_event         = osc_import_event,
3278         .o_process_config       = osc_process_config,
3279         .o_quotactl             = osc_quotactl,
3280         .o_quotacheck           = osc_quotacheck,
3281 };
3282
3283 extern struct lu_kmem_descr osc_caches[];
3284 extern struct lock_class_key osc_ast_guard_class;
3285
3286 int __init osc_init(void)
3287 {
3288         bool enable_proc = true;
3289         struct obd_type *type;
3290         int rc;
3291         ENTRY;
3292
3293         /* print an address of _any_ initialized kernel symbol from this
3294          * module, to allow debugging with gdb that doesn't support data
3295          * symbols from modules.*/
3296         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3297
3298         rc = lu_kmem_init(osc_caches);
3299         if (rc)
3300                 RETURN(rc);
3301
3302         type = class_search_type(LUSTRE_OSP_NAME);
3303         if (type != NULL && type->typ_procsym != NULL)
3304                 enable_proc = false;
3305
3306         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3307                                  LUSTRE_OSC_NAME, &osc_device_type);
3308         if (rc) {
3309                 lu_kmem_fini(osc_caches);
3310                 RETURN(rc);
3311         }
3312
3313         RETURN(rc);
3314 }
3315
3316 static void /*__exit*/ osc_exit(void)
3317 {
3318         class_unregister_type(LUSTRE_OSC_NAME);
3319         lu_kmem_fini(osc_caches);
3320 }
3321
3322 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3323 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3324 MODULE_LICENSE("GPL");
3325
3326 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);