Whamcloud - gitweb
LU-4176 tests: re-enable sanity-hsm/test_31a
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         obd_count                 aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_async_args {
72         struct obd_info *aa_oi;
73 };
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct obd_info *fa_oi;
83         obd_enqueue_update_f     fa_upcall;
84         void                    *fa_cookie;
85 };
86
87 struct osc_enqueue_args {
88         struct obd_export       *oa_exp;
89         ldlm_type_t             oa_type;
90         ldlm_mode_t             oa_mode;
91         __u64                   *oa_flags;
92         osc_enqueue_upcall_f    oa_upcall;
93         void                    *oa_cookie;
94         struct ost_lvb          *oa_lvb;
95         struct lustre_handle    oa_lockh;
96         unsigned int            oa_agl:1;
97 };
98
99 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
100 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
101                          void *data, int rc);
102
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105                         struct lov_mds_md *lmm, int lmm_bytes)
106 {
107         int lsm_size;
108         struct obd_import *imp = class_exp2cliimp(exp);
109         ENTRY;
110
111         if (lmm != NULL) {
112                 if (lmm_bytes < sizeof(*lmm)) {
113                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
114                                exp->exp_obd->obd_name, lmm_bytes,
115                                (int)sizeof(*lmm));
116                         RETURN(-EINVAL);
117                 }
118                 /* XXX LOV_MAGIC etc check? */
119
120                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
121                         CERROR("%s: zero lmm_object_id: rc = %d\n",
122                                exp->exp_obd->obd_name, -EINVAL);
123                         RETURN(-EINVAL);
124                 }
125         }
126
127         lsm_size = lov_stripe_md_size(1);
128         if (lsmp == NULL)
129                 RETURN(lsm_size);
130
131         if (*lsmp != NULL && lmm == NULL) {
132                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
133                 OBD_FREE(*lsmp, lsm_size);
134                 *lsmp = NULL;
135                 RETURN(0);
136         }
137
138         if (*lsmp == NULL) {
139                 OBD_ALLOC(*lsmp, lsm_size);
140                 if (unlikely(*lsmp == NULL))
141                         RETURN(-ENOMEM);
142                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
143                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
144                         OBD_FREE(*lsmp, lsm_size);
145                         RETURN(-ENOMEM);
146                 }
147                 loi_init((*lsmp)->lsm_oinfo[0]);
148         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
149                 RETURN(-EBADF);
150         }
151
152         if (lmm != NULL)
153                 /* XXX zero *lsmp? */
154                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
155
156         if (imp != NULL &&
157             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
158                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
159         else
160                 (*lsmp)->lsm_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
161
162         RETURN(lsm_size);
163 }
164
165 static inline void osc_pack_capa(struct ptlrpc_request *req,
166                                  struct ost_body *body, void *capa)
167 {
168         struct obd_capa *oc = (struct obd_capa *)capa;
169         struct lustre_capa *c;
170
171         if (!capa)
172                 return;
173
174         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
175         LASSERT(c);
176         capa_cpy(c, oc);
177         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
178         DEBUG_CAPA(D_SEC, c, "pack");
179 }
180
181 static inline void osc_pack_req_body(struct ptlrpc_request *req,
182                                      struct obd_info *oinfo)
183 {
184         struct ost_body *body;
185
186         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
187         LASSERT(body);
188
189         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
190                              oinfo->oi_oa);
191         osc_pack_capa(req, body, oinfo->oi_capa);
192 }
193
194 static inline void osc_set_capa_size(struct ptlrpc_request *req,
195                                      const struct req_msg_field *field,
196                                      struct obd_capa *oc)
197 {
198         if (oc == NULL)
199                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
200         else
201                 /* it is already calculated as sizeof struct obd_capa */
202                 ;
203 }
204
205 static int osc_getattr_interpret(const struct lu_env *env,
206                                  struct ptlrpc_request *req,
207                                  struct osc_async_args *aa, int rc)
208 {
209         struct ost_body *body;
210         ENTRY;
211
212         if (rc != 0)
213                 GOTO(out, rc);
214
215         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
216         if (body) {
217                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
218                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
219                                      aa->aa_oi->oi_oa, &body->oa);
220
221                 /* This should really be sent by the OST */
222                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
223                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
224         } else {
225                 CDEBUG(D_INFO, "can't unpack ost_body\n");
226                 rc = -EPROTO;
227                 aa->aa_oi->oi_oa->o_valid = 0;
228         }
229 out:
230         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
231         RETURN(rc);
232 }
233
234 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
235                              struct ptlrpc_request_set *set)
236 {
237         struct ptlrpc_request *req;
238         struct osc_async_args *aa;
239         int                    rc;
240         ENTRY;
241
242         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
243         if (req == NULL)
244                 RETURN(-ENOMEM);
245
246         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
247         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
248         if (rc) {
249                 ptlrpc_request_free(req);
250                 RETURN(rc);
251         }
252
253         osc_pack_req_body(req, oinfo);
254
255         ptlrpc_request_set_replen(req);
256         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
257
258         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
259         aa = ptlrpc_req_async_args(req);
260         aa->aa_oi = oinfo;
261
262         ptlrpc_set_add_req(set, req);
263         RETURN(0);
264 }
265
266 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
267                        struct obd_info *oinfo)
268 {
269         struct ptlrpc_request *req;
270         struct ost_body       *body;
271         int                    rc;
272         ENTRY;
273
274         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275         if (req == NULL)
276                 RETURN(-ENOMEM);
277
278         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
279         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
280         if (rc) {
281                 ptlrpc_request_free(req);
282                 RETURN(rc);
283         }
284
285         osc_pack_req_body(req, oinfo);
286
287         ptlrpc_request_set_replen(req);
288
289         rc = ptlrpc_queue_wait(req);
290         if (rc)
291                 GOTO(out, rc);
292
293         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
294         if (body == NULL)
295                 GOTO(out, rc = -EPROTO);
296
297         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
298         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
299                              &body->oa);
300
301         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
302         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303
304         EXIT;
305  out:
306         ptlrpc_req_finished(req);
307         return rc;
308 }
309
310 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
311                        struct obd_info *oinfo, struct obd_trans_info *oti)
312 {
313         struct ptlrpc_request *req;
314         struct ost_body       *body;
315         int                    rc;
316         ENTRY;
317
318         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
344                              &body->oa);
345
346         EXIT;
347 out:
348         ptlrpc_req_finished(req);
349         RETURN(rc);
350 }
351
352 static int osc_setattr_interpret(const struct lu_env *env,
353                                  struct ptlrpc_request *req,
354                                  struct osc_setattr_args *sa, int rc)
355 {
356         struct ost_body *body;
357         ENTRY;
358
359         if (rc != 0)
360                 GOTO(out, rc);
361
362         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
363         if (body == NULL)
364                 GOTO(out, rc = -EPROTO);
365
366         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
367                              &body->oa);
368 out:
369         rc = sa->sa_upcall(sa->sa_cookie, rc);
370         RETURN(rc);
371 }
372
373 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
374                            struct obd_trans_info *oti,
375                            obd_enqueue_update_f upcall, void *cookie,
376                            struct ptlrpc_request_set *rqset)
377 {
378         struct ptlrpc_request   *req;
379         struct osc_setattr_args *sa;
380         int                      rc;
381         ENTRY;
382
383         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
384         if (req == NULL)
385                 RETURN(-ENOMEM);
386
387         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
388         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
389         if (rc) {
390                 ptlrpc_request_free(req);
391                 RETURN(rc);
392         }
393
394         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
395                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
396
397         osc_pack_req_body(req, oinfo);
398
399         ptlrpc_request_set_replen(req);
400
401         /* do mds to ost setattr asynchronously */
402         if (!rqset) {
403                 /* Do not wait for response. */
404                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
405         } else {
406                 req->rq_interpret_reply =
407                         (ptlrpc_interpterer_t)osc_setattr_interpret;
408
409                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
410                 sa = ptlrpc_req_async_args(req);
411                 sa->sa_oa = oinfo->oi_oa;
412                 sa->sa_upcall = upcall;
413                 sa->sa_cookie = cookie;
414
415                 if (rqset == PTLRPCD_SET)
416                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
417                 else
418                         ptlrpc_set_add_req(rqset, req);
419         }
420
421         RETURN(0);
422 }
423
424 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
425                              struct obd_trans_info *oti,
426                              struct ptlrpc_request_set *rqset)
427 {
428         return osc_setattr_async_base(exp, oinfo, oti,
429                                       oinfo->oi_cb_up, oinfo, rqset);
430 }
431
432 int osc_real_create(struct obd_export *exp, struct obdo *oa,
433                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
434 {
435         struct ptlrpc_request *req;
436         struct ost_body       *body;
437         struct lov_stripe_md  *lsm;
438         int                    rc;
439         ENTRY;
440
441         LASSERT(oa);
442         LASSERT(ea);
443
444         lsm = *ea;
445         if (!lsm) {
446                 rc = obd_alloc_memmd(exp, &lsm);
447                 if (rc < 0)
448                         RETURN(rc);
449         }
450
451         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
452         if (req == NULL)
453                 GOTO(out, rc = -ENOMEM);
454
455         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
456         if (rc) {
457                 ptlrpc_request_free(req);
458                 GOTO(out, rc);
459         }
460
461         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
462         LASSERT(body);
463
464         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
465
466         ptlrpc_request_set_replen(req);
467
468         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
469             oa->o_flags == OBD_FL_DELORPHAN) {
470                 DEBUG_REQ(D_HA, req,
471                           "delorphan from OST integration");
472                 /* Don't resend the delorphan req */
473                 req->rq_no_resend = req->rq_no_delay = 1;
474         }
475
476         rc = ptlrpc_queue_wait(req);
477         if (rc)
478                 GOTO(out_req, rc);
479
480         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
481         if (body == NULL)
482                 GOTO(out_req, rc = -EPROTO);
483
484         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
485         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
486
487         oa->o_blksize = cli_brw_size(exp->exp_obd);
488         oa->o_valid |= OBD_MD_FLBLKSZ;
489
490         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
491          * have valid lsm_oinfo data structs, so don't go touching that.
492          * This needs to be fixed in a big way.
493          */
494         lsm->lsm_oi = oa->o_oi;
495         *ea = lsm;
496
497         if (oti != NULL) {
498                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499                         if (oti->oti_logcookies == NULL)
500                                 oti->oti_logcookies = &oti->oti_onecookie;
501
502                         *oti->oti_logcookies = oa->o_lcookie;
503                 }
504         }
505
506         CDEBUG(D_HA, "transno: "LPD64"\n",
507                lustre_msg_get_transno(req->rq_repmsg));
508 out_req:
509         ptlrpc_req_finished(req);
510 out:
511         if (rc && !*ea)
512                 obd_free_memmd(exp, &lsm);
513         RETURN(rc);
514 }
515
516 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
517                    obd_enqueue_update_f upcall, void *cookie,
518                    struct ptlrpc_request_set *rqset)
519 {
520         struct ptlrpc_request   *req;
521         struct osc_setattr_args *sa;
522         struct ost_body         *body;
523         int                      rc;
524         ENTRY;
525
526         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
527         if (req == NULL)
528                 RETURN(-ENOMEM);
529
530         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
532         if (rc) {
533                 ptlrpc_request_free(req);
534                 RETURN(rc);
535         }
536         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537         ptlrpc_at_set_req_timeout(req);
538
539         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
540         LASSERT(body);
541         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
542                              oinfo->oi_oa);
543         osc_pack_capa(req, body, oinfo->oi_capa);
544
545         ptlrpc_request_set_replen(req);
546
547         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
548         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
549         sa = ptlrpc_req_async_args(req);
550         sa->sa_oa     = oinfo->oi_oa;
551         sa->sa_upcall = upcall;
552         sa->sa_cookie = cookie;
553         if (rqset == PTLRPCD_SET)
554                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
555         else
556                 ptlrpc_set_add_req(rqset, req);
557
558         RETURN(0);
559 }
560
561 static int osc_sync_interpret(const struct lu_env *env,
562                               struct ptlrpc_request *req,
563                               void *arg, int rc)
564 {
565         struct osc_fsync_args *fa = arg;
566         struct ost_body *body;
567         ENTRY;
568
569         if (rc)
570                 GOTO(out, rc);
571
572         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
573         if (body == NULL) {
574                 CERROR ("can't unpack ost_body\n");
575                 GOTO(out, rc = -EPROTO);
576         }
577
578         *fa->fa_oi->oi_oa = body->oa;
579 out:
580         rc = fa->fa_upcall(fa->fa_cookie, rc);
581         RETURN(rc);
582 }
583
584 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
585                   obd_enqueue_update_f upcall, void *cookie,
586                   struct ptlrpc_request_set *rqset)
587 {
588         struct ptlrpc_request *req;
589         struct ost_body       *body;
590         struct osc_fsync_args *fa;
591         int                    rc;
592         ENTRY;
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
609                              oinfo->oi_oa);
610         osc_pack_capa(req, body, oinfo->oi_capa);
611
612         ptlrpc_request_set_replen(req);
613         req->rq_interpret_reply = osc_sync_interpret;
614
615         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
616         fa = ptlrpc_req_async_args(req);
617         fa->fa_oi = oinfo;
618         fa->fa_upcall = upcall;
619         fa->fa_cookie = cookie;
620
621         if (rqset == PTLRPCD_SET)
622                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
623         else
624                 ptlrpc_set_add_req(rqset, req);
625
626         RETURN (0);
627 }
628
629 /* Find and cancel locally locks matched by @mode in the resource found by
630  * @objid. Found locks are added into @cancel list. Returns the amount of
631  * locks added to @cancels list. */
632 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
633                                    struct list_head *cancels,
634                                    ldlm_mode_t mode, __u64 lock_flags)
635 {
636         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
637         struct ldlm_res_id res_id;
638         struct ldlm_resource *res;
639         int count;
640         ENTRY;
641
642         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
643          * export) but disabled through procfs (flag in NS).
644          *
645          * This distinguishes from a case when ELC is not supported originally,
646          * when we still want to cancel locks in advance and just cancel them
647          * locally, without sending any RPC. */
648         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
649                 RETURN(0);
650
651         ostid_build_res_name(&oa->o_oi, &res_id);
652         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
653         if (IS_ERR(res))
654                 RETURN(0);
655
656         LDLM_RESOURCE_ADDREF(res);
657         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
658                                            lock_flags, 0, NULL);
659         LDLM_RESOURCE_DELREF(res);
660         ldlm_resource_putref(res);
661         RETURN(count);
662 }
663
664 static int osc_destroy_interpret(const struct lu_env *env,
665                                  struct ptlrpc_request *req, void *data,
666                                  int rc)
667 {
668         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
669
670         atomic_dec(&cli->cl_destroy_in_flight);
671         wake_up(&cli->cl_destroy_waitq);
672         return 0;
673 }
674
675 static int osc_can_send_destroy(struct client_obd *cli)
676 {
677         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
678             cli->cl_max_rpcs_in_flight) {
679                 /* The destroy request can be sent */
680                 return 1;
681         }
682         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
683             cli->cl_max_rpcs_in_flight) {
684                 /*
685                  * The counter has been modified between the two atomic
686                  * operations.
687                  */
688                 wake_up(&cli->cl_destroy_waitq);
689         }
690         return 0;
691 }
692
693 int osc_create(const struct lu_env *env, struct obd_export *exp,
694                struct obdo *oa, struct lov_stripe_md **ea,
695                struct obd_trans_info *oti)
696 {
697         int rc = 0;
698         ENTRY;
699
700         LASSERT(oa);
701         LASSERT(ea);
702         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
703
704         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
705             oa->o_flags == OBD_FL_RECREATE_OBJS) {
706                 RETURN(osc_real_create(exp, oa, ea, oti));
707         }
708
709         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
710                 RETURN(osc_real_create(exp, oa, ea, oti));
711
712         /* we should not get here anymore */
713         LBUG();
714
715         RETURN(rc);
716 }
717
718 /* Destroy requests can be async always on the client, and we don't even really
719  * care about the return code since the client cannot do anything at all about
720  * a destroy failure.
721  * When the MDS is unlinking a filename, it saves the file objects into a
722  * recovery llog, and these object records are cancelled when the OST reports
723  * they were destroyed and sync'd to disk (i.e. transaction committed).
724  * If the client dies, or the OST is down when the object should be destroyed,
725  * the records are not cancelled, and when the OST reconnects to the MDS next,
726  * it will retrieve the llog unlink logs and then sends the log cancellation
727  * cookies to the MDS after committing destroy transactions. */
728 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
729                        struct obdo *oa, struct lov_stripe_md *ea,
730                        struct obd_trans_info *oti, struct obd_export *md_export,
731                        void *capa)
732 {
733         struct client_obd     *cli = &exp->exp_obd->u.cli;
734         struct ptlrpc_request *req;
735         struct ost_body       *body;
736         struct list_head       cancels = LIST_HEAD_INIT(cancels);
737         int rc, count;
738         ENTRY;
739
740         if (!oa) {
741                 CDEBUG(D_INFO, "oa NULL\n");
742                 RETURN(-EINVAL);
743         }
744
745         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
746                                         LDLM_FL_DISCARD_DATA);
747
748         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
749         if (req == NULL) {
750                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
751                 RETURN(-ENOMEM);
752         }
753
754         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
755         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
756                                0, &cancels, count);
757         if (rc) {
758                 ptlrpc_request_free(req);
759                 RETURN(rc);
760         }
761
762         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
763         ptlrpc_at_set_req_timeout(req);
764
765         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
766                 oa->o_lcookie = *oti->oti_logcookies;
767         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
768         LASSERT(body);
769         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
770
771         osc_pack_capa(req, body, (struct obd_capa *)capa);
772         ptlrpc_request_set_replen(req);
773
774         /* If osc_destory is for destroying the unlink orphan,
775          * sent from MDT to OST, which should not be blocked here,
776          * because the process might be triggered by ptlrpcd, and
777          * it is not good to block ptlrpcd thread (b=16006)*/
778         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
779                 req->rq_interpret_reply = osc_destroy_interpret;
780                 if (!osc_can_send_destroy(cli)) {
781                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
782                                                           NULL);
783
784                         /*
785                          * Wait until the number of on-going destroy RPCs drops
786                          * under max_rpc_in_flight
787                          */
788                         l_wait_event_exclusive(cli->cl_destroy_waitq,
789                                                osc_can_send_destroy(cli), &lwi);
790                 }
791         }
792
793         /* Do not wait for response */
794         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
795         RETURN(0);
796 }
797
798 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
799                                 long writing_bytes)
800 {
801         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
802
803         LASSERT(!(oa->o_valid & bits));
804
805         oa->o_valid |= bits;
806         client_obd_list_lock(&cli->cl_loi_list_lock);
807         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
808         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
809                      cli->cl_dirty_max_pages)) {
810                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
811                        cli->cl_dirty_pages, cli->cl_dirty_transit,
812                        cli->cl_dirty_max_pages);
813                 oa->o_undirty = 0;
814         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
815                             atomic_long_read(&obd_dirty_transit_pages) >
816                             (obd_max_dirty_pages + 1))) {
817                 /* The atomic_read() allowing the atomic_inc() are
818                  * not covered by a lock thus they may safely race and trip
819                  * this CERROR() unless we add in a small fudge factor (+1). */
820                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
821                        cli->cl_import->imp_obd->obd_name,
822                        atomic_long_read(&obd_dirty_pages),
823                        atomic_long_read(&obd_dirty_transit_pages),
824                        obd_max_dirty_pages);
825                 oa->o_undirty = 0;
826         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
827                             0x7fffffff)) {
828                 CERROR("dirty %lu - dirty_max %lu too big???\n",
829                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
830                 oa->o_undirty = 0;
831         } else {
832                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
833                                       PAGE_CACHE_SHIFT) *
834                                      (cli->cl_max_rpcs_in_flight + 1);
835                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
836                                     max_in_flight);
837         }
838         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
839         oa->o_dropped = cli->cl_lost_grant;
840         cli->cl_lost_grant = 0;
841         client_obd_list_unlock(&cli->cl_loi_list_lock);
842         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
843                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
844
845 }
846
847 void osc_update_next_shrink(struct client_obd *cli)
848 {
849         cli->cl_next_shrink_grant =
850                 cfs_time_shift(cli->cl_grant_shrink_interval);
851         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
852                cli->cl_next_shrink_grant);
853 }
854
855 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
856 {
857         client_obd_list_lock(&cli->cl_loi_list_lock);
858         cli->cl_avail_grant += grant;
859         client_obd_list_unlock(&cli->cl_loi_list_lock);
860 }
861
862 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
863 {
864         if (body->oa.o_valid & OBD_MD_FLGRANT) {
865                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
866                 __osc_update_grant(cli, body->oa.o_grant);
867         }
868 }
869
870 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
871                               obd_count keylen, void *key, obd_count vallen,
872                               void *val, struct ptlrpc_request_set *set);
873
874 static int osc_shrink_grant_interpret(const struct lu_env *env,
875                                       struct ptlrpc_request *req,
876                                       void *aa, int rc)
877 {
878         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
879         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
880         struct ost_body *body;
881
882         if (rc != 0) {
883                 __osc_update_grant(cli, oa->o_grant);
884                 GOTO(out, rc);
885         }
886
887         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
888         LASSERT(body);
889         osc_update_grant(cli, body);
890 out:
891         OBDO_FREE(oa);
892         return rc;
893 }
894
895 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
896 {
897         client_obd_list_lock(&cli->cl_loi_list_lock);
898         oa->o_grant = cli->cl_avail_grant / 4;
899         cli->cl_avail_grant -= oa->o_grant;
900         client_obd_list_unlock(&cli->cl_loi_list_lock);
901         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
902                 oa->o_valid |= OBD_MD_FLFLAGS;
903                 oa->o_flags = 0;
904         }
905         oa->o_flags |= OBD_FL_SHRINK_GRANT;
906         osc_update_next_shrink(cli);
907 }
908
909 /* Shrink the current grant, either from some large amount to enough for a
910  * full set of in-flight RPCs, or if we have already shrunk to that limit
911  * then to enough for a single RPC.  This avoids keeping more grant than
912  * needed, and avoids shrinking the grant piecemeal. */
913 static int osc_shrink_grant(struct client_obd *cli)
914 {
915         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
916                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
917
918         client_obd_list_lock(&cli->cl_loi_list_lock);
919         if (cli->cl_avail_grant <= target_bytes)
920                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
921         client_obd_list_unlock(&cli->cl_loi_list_lock);
922
923         return osc_shrink_grant_to_target(cli, target_bytes);
924 }
925
926 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
927 {
928         int                     rc = 0;
929         struct ost_body        *body;
930         ENTRY;
931
932         client_obd_list_lock(&cli->cl_loi_list_lock);
933         /* Don't shrink if we are already above or below the desired limit
934          * We don't want to shrink below a single RPC, as that will negatively
935          * impact block allocation and long-term performance. */
936         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
937                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
938
939         if (target_bytes >= cli->cl_avail_grant) {
940                 client_obd_list_unlock(&cli->cl_loi_list_lock);
941                 RETURN(0);
942         }
943         client_obd_list_unlock(&cli->cl_loi_list_lock);
944
945         OBD_ALLOC_PTR(body);
946         if (!body)
947                 RETURN(-ENOMEM);
948
949         osc_announce_cached(cli, &body->oa, 0);
950
951         client_obd_list_lock(&cli->cl_loi_list_lock);
952         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
953         cli->cl_avail_grant = target_bytes;
954         client_obd_list_unlock(&cli->cl_loi_list_lock);
955         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
956                 body->oa.o_valid |= OBD_MD_FLFLAGS;
957                 body->oa.o_flags = 0;
958         }
959         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
960         osc_update_next_shrink(cli);
961
962         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
963                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
964                                 sizeof(*body), body, NULL);
965         if (rc != 0)
966                 __osc_update_grant(cli, body->oa.o_grant);
967         OBD_FREE_PTR(body);
968         RETURN(rc);
969 }
970
971 static int osc_should_shrink_grant(struct client_obd *client)
972 {
973         cfs_time_t time = cfs_time_current();
974         cfs_time_t next_shrink = client->cl_next_shrink_grant;
975
976         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
977              OBD_CONNECT_GRANT_SHRINK) == 0)
978                 return 0;
979
980         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
981                 /* Get the current RPC size directly, instead of going via:
982                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
983                  * Keep comment here so that it can be found by searching. */
984                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
985
986                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
987                     client->cl_avail_grant > brw_size)
988                         return 1;
989                 else
990                         osc_update_next_shrink(client);
991         }
992         return 0;
993 }
994
995 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
996 {
997         struct client_obd *client;
998
999         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1000                 if (osc_should_shrink_grant(client))
1001                         osc_shrink_grant(client);
1002         }
1003         return 0;
1004 }
1005
1006 static int osc_add_shrink_grant(struct client_obd *client)
1007 {
1008         int rc;
1009
1010         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1011                                        TIMEOUT_GRANT,
1012                                        osc_grant_shrink_grant_cb, NULL,
1013                                        &client->cl_grant_shrink_list);
1014         if (rc) {
1015                 CERROR("add grant client %s error %d\n",
1016                         client->cl_import->imp_obd->obd_name, rc);
1017                 return rc;
1018         }
1019         CDEBUG(D_CACHE, "add grant client %s \n",
1020                client->cl_import->imp_obd->obd_name);
1021         osc_update_next_shrink(client);
1022         return 0;
1023 }
1024
1025 static int osc_del_shrink_grant(struct client_obd *client)
1026 {
1027         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1028                                          TIMEOUT_GRANT);
1029 }
1030
1031 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1032 {
1033         /*
1034          * ocd_grant is the total grant amount we're expect to hold: if we've
1035          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1036          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1037          * dirty.
1038          *
1039          * race is tolerable here: if we're evicted, but imp_state already
1040          * left EVICTED state, then cl_dirty_pages must be 0 already.
1041          */
1042         client_obd_list_lock(&cli->cl_loi_list_lock);
1043         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1044                 cli->cl_avail_grant = ocd->ocd_grant;
1045         else
1046                 cli->cl_avail_grant = ocd->ocd_grant -
1047                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1048
1049         if (cli->cl_avail_grant < 0) {
1050                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1051                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1052                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1053                 /* workaround for servers which do not have the patch from
1054                  * LU-2679 */
1055                 cli->cl_avail_grant = ocd->ocd_grant;
1056         }
1057
1058         /* determine the appropriate chunk size used by osc_extent. */
1059         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1060         client_obd_list_unlock(&cli->cl_loi_list_lock);
1061
1062         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1063                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1064                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1065
1066         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1067             list_empty(&cli->cl_grant_shrink_list))
1068                 osc_add_shrink_grant(cli);
1069 }
1070
1071 /* We assume that the reason this OSC got a short read is because it read
1072  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1073  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1074  * this stripe never got written at or beyond this stripe offset yet. */
1075 static void handle_short_read(int nob_read, obd_count page_count,
1076                               struct brw_page **pga)
1077 {
1078         char *ptr;
1079         int i = 0;
1080
1081         /* skip bytes read OK */
1082         while (nob_read > 0) {
1083                 LASSERT (page_count > 0);
1084
1085                 if (pga[i]->count > nob_read) {
1086                         /* EOF inside this page */
1087                         ptr = kmap(pga[i]->pg) +
1088                                 (pga[i]->off & ~CFS_PAGE_MASK);
1089                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1090                         kunmap(pga[i]->pg);
1091                         page_count--;
1092                         i++;
1093                         break;
1094                 }
1095
1096                 nob_read -= pga[i]->count;
1097                 page_count--;
1098                 i++;
1099         }
1100
1101         /* zero remaining pages */
1102         while (page_count-- > 0) {
1103                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1104                 memset(ptr, 0, pga[i]->count);
1105                 kunmap(pga[i]->pg);
1106                 i++;
1107         }
1108 }
1109
1110 static int check_write_rcs(struct ptlrpc_request *req,
1111                            int requested_nob, int niocount,
1112                            obd_count page_count, struct brw_page **pga)
1113 {
1114         int     i;
1115         __u32   *remote_rcs;
1116
1117         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1118                                                   sizeof(*remote_rcs) *
1119                                                   niocount);
1120         if (remote_rcs == NULL) {
1121                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1122                 return(-EPROTO);
1123         }
1124
1125         /* return error if any niobuf was in error */
1126         for (i = 0; i < niocount; i++) {
1127                 if ((int)remote_rcs[i] < 0)
1128                         return(remote_rcs[i]);
1129
1130                 if (remote_rcs[i] != 0) {
1131                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1132                                 i, remote_rcs[i], req);
1133                         return(-EPROTO);
1134                 }
1135         }
1136
1137         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1138                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1139                        req->rq_bulk->bd_nob_transferred, requested_nob);
1140                 return(-EPROTO);
1141         }
1142
1143         return (0);
1144 }
1145
1146 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1147 {
1148         if (p1->flag != p2->flag) {
1149                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1150                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1151                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1152
1153                 /* warn if we try to combine flags that we don't know to be
1154                  * safe to combine */
1155                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1156                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1157                               "report this at https://jira.hpdd.intel.com/\n",
1158                               p1->flag, p2->flag);
1159                 }
1160                 return 0;
1161         }
1162
1163         return (p1->off + p1->count == p2->off);
1164 }
1165
1166 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1167                                    struct brw_page **pga, int opc,
1168                                    cksum_type_t cksum_type)
1169 {
1170         __u32                           cksum;
1171         int                             i = 0;
1172         struct cfs_crypto_hash_desc     *hdesc;
1173         unsigned int                    bufsize;
1174         int                             err;
1175         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1176
1177         LASSERT(pg_count > 0);
1178
1179         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1180         if (IS_ERR(hdesc)) {
1181                 CERROR("Unable to initialize checksum hash %s\n",
1182                        cfs_crypto_hash_name(cfs_alg));
1183                 return PTR_ERR(hdesc);
1184         }
1185
1186         while (nob > 0 && pg_count > 0) {
1187                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1188
1189                 /* corrupt the data before we compute the checksum, to
1190                  * simulate an OST->client data error */
1191                 if (i == 0 && opc == OST_READ &&
1192                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1193                         unsigned char *ptr = kmap(pga[i]->pg);
1194                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1195
1196                         memcpy(ptr + off, "bad1", min(4, nob));
1197                         kunmap(pga[i]->pg);
1198                 }
1199                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1200                                             pga[i]->off & ~CFS_PAGE_MASK,
1201                                             count);
1202                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1203                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1204
1205                 nob -= pga[i]->count;
1206                 pg_count--;
1207                 i++;
1208         }
1209
1210         bufsize = sizeof(cksum);
1211         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1212
1213         /* For sending we only compute the wrong checksum instead
1214          * of corrupting the data so it is still correct on a redo */
1215         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1216                 cksum++;
1217
1218         return cksum;
1219 }
1220
1221 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1222                                 struct lov_stripe_md *lsm, obd_count page_count,
1223                                 struct brw_page **pga,
1224                                 struct ptlrpc_request **reqp,
1225                                 struct obd_capa *ocapa, int reserve,
1226                                 int resend)
1227 {
1228         struct ptlrpc_request   *req;
1229         struct ptlrpc_bulk_desc *desc;
1230         struct ost_body         *body;
1231         struct obd_ioobj        *ioobj;
1232         struct niobuf_remote    *niobuf;
1233         int niocount, i, requested_nob, opc, rc;
1234         struct osc_brw_async_args *aa;
1235         struct req_capsule      *pill;
1236         struct brw_page *pg_prev;
1237
1238         ENTRY;
1239         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1240                 RETURN(-ENOMEM); /* Recoverable */
1241         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1242                 RETURN(-EINVAL); /* Fatal */
1243
1244         if ((cmd & OBD_BRW_WRITE) != 0) {
1245                 opc = OST_WRITE;
1246                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1247                                                 cli->cl_import->imp_rq_pool,
1248                                                 &RQF_OST_BRW_WRITE);
1249         } else {
1250                 opc = OST_READ;
1251                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1252         }
1253         if (req == NULL)
1254                 RETURN(-ENOMEM);
1255
1256         for (niocount = i = 1; i < page_count; i++) {
1257                 if (!can_merge_pages(pga[i - 1], pga[i]))
1258                         niocount++;
1259         }
1260
1261         pill = &req->rq_pill;
1262         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1263                              sizeof(*ioobj));
1264         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1265                              niocount * sizeof(*niobuf));
1266         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1267
1268         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1269         if (rc) {
1270                 ptlrpc_request_free(req);
1271                 RETURN(rc);
1272         }
1273         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1274         ptlrpc_at_set_req_timeout(req);
1275         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1276          * retry logic */
1277         req->rq_no_retry_einprogress = 1;
1278
1279         desc = ptlrpc_prep_bulk_imp(req, page_count,
1280                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1281                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1282                 OST_BULK_PORTAL);
1283
1284         if (desc == NULL)
1285                 GOTO(out, rc = -ENOMEM);
1286         /* NB request now owns desc and will free it when it gets freed */
1287
1288         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1289         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1290         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1291         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1292
1293         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1294
1295         obdo_to_ioobj(oa, ioobj);
1296         ioobj->ioo_bufcnt = niocount;
1297         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1298          * that might be send for this request.  The actual number is decided
1299          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1300          * "max - 1" for old client compatibility sending "0", and also so the
1301          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1302         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1303         osc_pack_capa(req, body, ocapa);
1304         LASSERT(page_count > 0);
1305         pg_prev = pga[0];
1306         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1307                 struct brw_page *pg = pga[i];
1308                 int poff = pg->off & ~CFS_PAGE_MASK;
1309
1310                 LASSERT(pg->count > 0);
1311                 /* make sure there is no gap in the middle of page array */
1312                 LASSERTF(page_count == 1 ||
1313                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1314                           ergo(i > 0 && i < page_count - 1,
1315                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1316                           ergo(i == page_count - 1, poff == 0)),
1317                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1318                          i, page_count, pg, pg->off, pg->count);
1319                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1320                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1321                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1322                          i, page_count,
1323                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1324                          pg_prev->pg, page_private(pg_prev->pg),
1325                          pg_prev->pg->index, pg_prev->off);
1326                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1327                         (pg->flag & OBD_BRW_SRVLOCK));
1328
1329                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1330                 requested_nob += pg->count;
1331
1332                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1333                         niobuf--;
1334                         niobuf->rnb_len += pg->count;
1335                 } else {
1336                         niobuf->rnb_offset = pg->off;
1337                         niobuf->rnb_len    = pg->count;
1338                         niobuf->rnb_flags  = pg->flag;
1339                 }
1340                 pg_prev = pg;
1341         }
1342
1343         LASSERTF((void *)(niobuf - niocount) ==
1344                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1345                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1346                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1347
1348         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1349         if (resend) {
1350                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1351                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1352                         body->oa.o_flags = 0;
1353                 }
1354                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1355         }
1356
1357         if (osc_should_shrink_grant(cli))
1358                 osc_shrink_grant_local(cli, &body->oa);
1359
1360         /* size[REQ_REC_OFF] still sizeof (*body) */
1361         if (opc == OST_WRITE) {
1362                 if (cli->cl_checksum &&
1363                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1364                         /* store cl_cksum_type in a local variable since
1365                          * it can be changed via lprocfs */
1366                         cksum_type_t cksum_type = cli->cl_cksum_type;
1367
1368                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1369                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1370                                 body->oa.o_flags = 0;
1371                         }
1372                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1373                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1374                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1375                                                              page_count, pga,
1376                                                              OST_WRITE,
1377                                                              cksum_type);
1378                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1379                                body->oa.o_cksum);
1380                         /* save this in 'oa', too, for later checking */
1381                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1382                         oa->o_flags |= cksum_type_pack(cksum_type);
1383                 } else {
1384                         /* clear out the checksum flag, in case this is a
1385                          * resend but cl_checksum is no longer set. b=11238 */
1386                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1387                 }
1388                 oa->o_cksum = body->oa.o_cksum;
1389                 /* 1 RC per niobuf */
1390                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1391                                      sizeof(__u32) * niocount);
1392         } else {
1393                 if (cli->cl_checksum &&
1394                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1395                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1396                                 body->oa.o_flags = 0;
1397                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1398                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1399                 }
1400         }
1401         ptlrpc_request_set_replen(req);
1402
1403         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1404         aa = ptlrpc_req_async_args(req);
1405         aa->aa_oa = oa;
1406         aa->aa_requested_nob = requested_nob;
1407         aa->aa_nio_count = niocount;
1408         aa->aa_page_count = page_count;
1409         aa->aa_resends = 0;
1410         aa->aa_ppga = pga;
1411         aa->aa_cli = cli;
1412         INIT_LIST_HEAD(&aa->aa_oaps);
1413         if (ocapa && reserve)
1414                 aa->aa_ocapa = capa_get(ocapa);
1415
1416         *reqp = req;
1417         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1418         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1419                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1420                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1421         RETURN(0);
1422
1423  out:
1424         ptlrpc_req_finished(req);
1425         RETURN(rc);
1426 }
1427
1428 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1429                                 __u32 client_cksum, __u32 server_cksum, int nob,
1430                                 obd_count page_count, struct brw_page **pga,
1431                                 cksum_type_t client_cksum_type)
1432 {
1433         __u32 new_cksum;
1434         char *msg;
1435         cksum_type_t cksum_type;
1436
1437         if (server_cksum == client_cksum) {
1438                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1439                 return 0;
1440         }
1441
1442         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1443                                        oa->o_flags : 0);
1444         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1445                                       cksum_type);
1446
1447         if (cksum_type != client_cksum_type)
1448                 msg = "the server did not use the checksum type specified in "
1449                       "the original request - likely a protocol problem";
1450         else if (new_cksum == server_cksum)
1451                 msg = "changed on the client after we checksummed it - "
1452                       "likely false positive due to mmap IO (bug 11742)";
1453         else if (new_cksum == client_cksum)
1454                 msg = "changed in transit before arrival at OST";
1455         else
1456                 msg = "changed in transit AND doesn't match the original - "
1457                       "likely false positive due to mmap IO (bug 11742)";
1458
1459         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1460                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1461                            msg, libcfs_nid2str(peer->nid),
1462                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1463                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1464                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1465                            POSTID(&oa->o_oi), pga[0]->off,
1466                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1467         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1468                "client csum now %x\n", client_cksum, client_cksum_type,
1469                server_cksum, cksum_type, new_cksum);
1470         return 1;
1471 }
1472
1473 /* Note rc enters this function as number of bytes transferred */
1474 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1475 {
1476         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1477         const lnet_process_id_t *peer =
1478                         &req->rq_import->imp_connection->c_peer;
1479         struct client_obd *cli = aa->aa_cli;
1480         struct ost_body *body;
1481         __u32 client_cksum = 0;
1482         ENTRY;
1483
1484         if (rc < 0 && rc != -EDQUOT) {
1485                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1486                 RETURN(rc);
1487         }
1488
1489         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1490         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1491         if (body == NULL) {
1492                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1493                 RETURN(-EPROTO);
1494         }
1495
1496         /* set/clear over quota flag for a uid/gid */
1497         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1498             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1499                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1500
1501                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1502                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1503                        body->oa.o_flags);
1504                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1505         }
1506
1507         osc_update_grant(cli, body);
1508
1509         if (rc < 0)
1510                 RETURN(rc);
1511
1512         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1513                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1514
1515         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1516                 if (rc > 0) {
1517                         CERROR("Unexpected +ve rc %d\n", rc);
1518                         RETURN(-EPROTO);
1519                 }
1520                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1521
1522                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1523                         RETURN(-EAGAIN);
1524
1525                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1526                     check_write_checksum(&body->oa, peer, client_cksum,
1527                                          body->oa.o_cksum, aa->aa_requested_nob,
1528                                          aa->aa_page_count, aa->aa_ppga,
1529                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1530                         RETURN(-EAGAIN);
1531
1532                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1533                                      aa->aa_page_count, aa->aa_ppga);
1534                 GOTO(out, rc);
1535         }
1536
1537         /* The rest of this function executes only for OST_READs */
1538
1539         /* if unwrap_bulk failed, return -EAGAIN to retry */
1540         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1541         if (rc < 0)
1542                 GOTO(out, rc = -EAGAIN);
1543
1544         if (rc > aa->aa_requested_nob) {
1545                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1546                        aa->aa_requested_nob);
1547                 RETURN(-EPROTO);
1548         }
1549
1550         if (rc != req->rq_bulk->bd_nob_transferred) {
1551                 CERROR ("Unexpected rc %d (%d transferred)\n",
1552                         rc, req->rq_bulk->bd_nob_transferred);
1553                 return (-EPROTO);
1554         }
1555
1556         if (rc < aa->aa_requested_nob)
1557                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1558
1559         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1560                 static int cksum_counter;
1561                 __u32      server_cksum = body->oa.o_cksum;
1562                 char      *via;
1563                 char      *router;
1564                 cksum_type_t cksum_type;
1565
1566                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1567                                                body->oa.o_flags : 0);
1568                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1569                                                  aa->aa_ppga, OST_READ,
1570                                                  cksum_type);
1571
1572                 if (peer->nid == req->rq_bulk->bd_sender) {
1573                         via = router = "";
1574                 } else {
1575                         via = " via ";
1576                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1577                 }
1578
1579                 if (server_cksum != client_cksum) {
1580                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1581                                            "%s%s%s inode "DFID" object "DOSTID
1582                                            " extent ["LPU64"-"LPU64"]\n",
1583                                            req->rq_import->imp_obd->obd_name,
1584                                            libcfs_nid2str(peer->nid),
1585                                            via, router,
1586                                            body->oa.o_valid & OBD_MD_FLFID ?
1587                                                 body->oa.o_parent_seq : (__u64)0,
1588                                            body->oa.o_valid & OBD_MD_FLFID ?
1589                                                 body->oa.o_parent_oid : 0,
1590                                            body->oa.o_valid & OBD_MD_FLFID ?
1591                                                 body->oa.o_parent_ver : 0,
1592                                            POSTID(&body->oa.o_oi),
1593                                            aa->aa_ppga[0]->off,
1594                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1595                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1596                                                                         1);
1597                         CERROR("client %x, server %x, cksum_type %x\n",
1598                                client_cksum, server_cksum, cksum_type);
1599                         cksum_counter = 0;
1600                         aa->aa_oa->o_cksum = client_cksum;
1601                         rc = -EAGAIN;
1602                 } else {
1603                         cksum_counter++;
1604                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1605                         rc = 0;
1606                 }
1607         } else if (unlikely(client_cksum)) {
1608                 static int cksum_missed;
1609
1610                 cksum_missed++;
1611                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1612                         CERROR("Checksum %u requested from %s but not sent\n",
1613                                cksum_missed, libcfs_nid2str(peer->nid));
1614         } else {
1615                 rc = 0;
1616         }
1617 out:
1618         if (rc >= 0)
1619                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1620                                      aa->aa_oa, &body->oa);
1621
1622         RETURN(rc);
1623 }
1624
1625 static int osc_brw_redo_request(struct ptlrpc_request *request,
1626                                 struct osc_brw_async_args *aa, int rc)
1627 {
1628         struct ptlrpc_request *new_req;
1629         struct osc_brw_async_args *new_aa;
1630         struct osc_async_page *oap;
1631         ENTRY;
1632
1633         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1634                   "redo for recoverable error %d", rc);
1635
1636         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1637                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1638                                   aa->aa_cli, aa->aa_oa,
1639                                   NULL /* lsm unused by osc currently */,
1640                                   aa->aa_page_count, aa->aa_ppga,
1641                                   &new_req, aa->aa_ocapa, 0, 1);
1642         if (rc)
1643                 RETURN(rc);
1644
1645         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1646                 if (oap->oap_request != NULL) {
1647                         LASSERTF(request == oap->oap_request,
1648                                  "request %p != oap_request %p\n",
1649                                  request, oap->oap_request);
1650                         if (oap->oap_interrupted) {
1651                                 ptlrpc_req_finished(new_req);
1652                                 RETURN(-EINTR);
1653                         }
1654                 }
1655         }
1656         /* New request takes over pga and oaps from old request.
1657          * Note that copying a list_head doesn't work, need to move it... */
1658         aa->aa_resends++;
1659         new_req->rq_interpret_reply = request->rq_interpret_reply;
1660         new_req->rq_async_args = request->rq_async_args;
1661         new_req->rq_commit_cb = request->rq_commit_cb;
1662         /* cap resend delay to the current request timeout, this is similar to
1663          * what ptlrpc does (see after_reply()) */
1664         if (aa->aa_resends > new_req->rq_timeout)
1665                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1666         else
1667                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1668         new_req->rq_generation_set = 1;
1669         new_req->rq_import_generation = request->rq_import_generation;
1670
1671         new_aa = ptlrpc_req_async_args(new_req);
1672
1673         INIT_LIST_HEAD(&new_aa->aa_oaps);
1674         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1675         INIT_LIST_HEAD(&new_aa->aa_exts);
1676         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1677         new_aa->aa_resends = aa->aa_resends;
1678
1679         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1680                 if (oap->oap_request) {
1681                         ptlrpc_req_finished(oap->oap_request);
1682                         oap->oap_request = ptlrpc_request_addref(new_req);
1683                 }
1684         }
1685
1686         new_aa->aa_ocapa = aa->aa_ocapa;
1687         aa->aa_ocapa = NULL;
1688
1689         /* XXX: This code will run into problem if we're going to support
1690          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1691          * and wait for all of them to be finished. We should inherit request
1692          * set from old request. */
1693         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1694
1695         DEBUG_REQ(D_INFO, new_req, "new request");
1696         RETURN(0);
1697 }
1698
1699 /*
1700  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1701  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1702  * fine for our small page arrays and doesn't require allocation.  its an
1703  * insertion sort that swaps elements that are strides apart, shrinking the
1704  * stride down until its '1' and the array is sorted.
1705  */
1706 static void sort_brw_pages(struct brw_page **array, int num)
1707 {
1708         int stride, i, j;
1709         struct brw_page *tmp;
1710
1711         if (num == 1)
1712                 return;
1713         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1714                 ;
1715
1716         do {
1717                 stride /= 3;
1718                 for (i = stride ; i < num ; i++) {
1719                         tmp = array[i];
1720                         j = i;
1721                         while (j >= stride && array[j - stride]->off > tmp->off) {
1722                                 array[j] = array[j - stride];
1723                                 j -= stride;
1724                         }
1725                         array[j] = tmp;
1726                 }
1727         } while (stride > 1);
1728 }
1729
1730 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1731 {
1732         LASSERT(ppga != NULL);
1733         OBD_FREE(ppga, sizeof(*ppga) * count);
1734 }
1735
1736 static int brw_interpret(const struct lu_env *env,
1737                          struct ptlrpc_request *req, void *data, int rc)
1738 {
1739         struct osc_brw_async_args *aa = data;
1740         struct osc_extent *ext;
1741         struct osc_extent *tmp;
1742         struct client_obd *cli = aa->aa_cli;
1743         ENTRY;
1744
1745         rc = osc_brw_fini_request(req, rc);
1746         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1747         /* When server return -EINPROGRESS, client should always retry
1748          * regardless of the number of times the bulk was resent already. */
1749         if (osc_recoverable_error(rc)) {
1750                 if (req->rq_import_generation !=
1751                     req->rq_import->imp_generation) {
1752                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1753                                ""DOSTID", rc = %d.\n",
1754                                req->rq_import->imp_obd->obd_name,
1755                                POSTID(&aa->aa_oa->o_oi), rc);
1756                 } else if (rc == -EINPROGRESS ||
1757                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1758                         rc = osc_brw_redo_request(req, aa, rc);
1759                 } else {
1760                         CERROR("%s: too many resent retries for object: "
1761                                ""LPU64":"LPU64", rc = %d.\n",
1762                                req->rq_import->imp_obd->obd_name,
1763                                POSTID(&aa->aa_oa->o_oi), rc);
1764                 }
1765
1766                 if (rc == 0)
1767                         RETURN(0);
1768                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1769                         rc = -EIO;
1770         }
1771
1772         if (aa->aa_ocapa) {
1773                 capa_put(aa->aa_ocapa);
1774                 aa->aa_ocapa = NULL;
1775         }
1776
1777         if (rc == 0) {
1778                 struct obdo *oa = aa->aa_oa;
1779                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1780                 unsigned long valid = 0;
1781                 struct cl_object *obj;
1782                 struct osc_async_page *last;
1783
1784                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1785                 obj = osc2cl(last->oap_obj);
1786
1787                 cl_object_attr_lock(obj);
1788                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1789                         attr->cat_blocks = oa->o_blocks;
1790                         valid |= CAT_BLOCKS;
1791                 }
1792                 if (oa->o_valid & OBD_MD_FLMTIME) {
1793                         attr->cat_mtime = oa->o_mtime;
1794                         valid |= CAT_MTIME;
1795                 }
1796                 if (oa->o_valid & OBD_MD_FLATIME) {
1797                         attr->cat_atime = oa->o_atime;
1798                         valid |= CAT_ATIME;
1799                 }
1800                 if (oa->o_valid & OBD_MD_FLCTIME) {
1801                         attr->cat_ctime = oa->o_ctime;
1802                         valid |= CAT_CTIME;
1803                 }
1804
1805                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1806                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1807                         loff_t last_off = last->oap_count + last->oap_obj_off +
1808                                 last->oap_page_off;
1809
1810                         /* Change file size if this is an out of quota or
1811                          * direct IO write and it extends the file size */
1812                         if (loi->loi_lvb.lvb_size < last_off) {
1813                                 attr->cat_size = last_off;
1814                                 valid |= CAT_SIZE;
1815                         }
1816                         /* Extend KMS if it's not a lockless write */
1817                         if (loi->loi_kms < last_off &&
1818                             oap2osc_page(last)->ops_srvlock == 0) {
1819                                 attr->cat_kms = last_off;
1820                                 valid |= CAT_KMS;
1821                         }
1822                 }
1823
1824                 if (valid != 0)
1825                         cl_object_attr_set(env, obj, attr, valid);
1826                 cl_object_attr_unlock(obj);
1827         }
1828         OBDO_FREE(aa->aa_oa);
1829
1830         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1831                 osc_inc_unstable_pages(req);
1832
1833         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1834                 list_del_init(&ext->oe_link);
1835                 osc_extent_finish(env, ext, 1, rc);
1836         }
1837         LASSERT(list_empty(&aa->aa_exts));
1838         LASSERT(list_empty(&aa->aa_oaps));
1839
1840         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1841                           req->rq_bulk->bd_nob_transferred);
1842         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1843         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1844
1845         client_obd_list_lock(&cli->cl_loi_list_lock);
1846         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1847          * is called so we know whether to go to sync BRWs or wait for more
1848          * RPCs to complete */
1849         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1850                 cli->cl_w_in_flight--;
1851         else
1852                 cli->cl_r_in_flight--;
1853         osc_wake_cache_waiters(cli);
1854         client_obd_list_unlock(&cli->cl_loi_list_lock);
1855
1856         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1857         RETURN(rc);
1858 }
1859
1860 static void brw_commit(struct ptlrpc_request *req)
1861 {
1862         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1863          * this called via the rq_commit_cb, I need to ensure
1864          * osc_dec_unstable_pages is still called. Otherwise unstable
1865          * pages may be leaked. */
1866         spin_lock(&req->rq_lock);
1867         if (likely(req->rq_unstable)) {
1868                 req->rq_unstable = 0;
1869                 spin_unlock(&req->rq_lock);
1870
1871                 osc_dec_unstable_pages(req);
1872         } else {
1873                 req->rq_committed = 1;
1874                 spin_unlock(&req->rq_lock);
1875         }
1876 }
1877
1878 /**
1879  * Build an RPC by the list of extent @ext_list. The caller must ensure
1880  * that the total pages in this list are NOT over max pages per RPC.
1881  * Extents in the list must be in OES_RPC state.
1882  */
1883 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1884                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1885 {
1886         struct ptlrpc_request           *req = NULL;
1887         struct osc_extent               *ext;
1888         struct brw_page                 **pga = NULL;
1889         struct osc_brw_async_args       *aa = NULL;
1890         struct obdo                     *oa = NULL;
1891         struct osc_async_page           *oap;
1892         struct osc_async_page           *tmp;
1893         struct cl_req                   *clerq = NULL;
1894         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1895                                                                       CRT_READ;
1896         struct cl_req_attr              *crattr = NULL;
1897         obd_off                         starting_offset = OBD_OBJECT_EOF;
1898         obd_off                         ending_offset = 0;
1899         int                             mpflag = 0;
1900         int                             mem_tight = 0;
1901         int                             page_count = 0;
1902         bool                            soft_sync = false;
1903         int                             i;
1904         int                             rc;
1905         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1906
1907         ENTRY;
1908         LASSERT(!list_empty(ext_list));
1909
1910         /* add pages into rpc_list to build BRW rpc */
1911         list_for_each_entry(ext, ext_list, oe_link) {
1912                 LASSERT(ext->oe_state == OES_RPC);
1913                 mem_tight |= ext->oe_memalloc;
1914                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1915                         ++page_count;
1916                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1917                         if (starting_offset > oap->oap_obj_off)
1918                                 starting_offset = oap->oap_obj_off;
1919                         else
1920                                 LASSERT(oap->oap_page_off == 0);
1921                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1922                                 ending_offset = oap->oap_obj_off +
1923                                                 oap->oap_count;
1924                         else
1925                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1926                                         PAGE_CACHE_SIZE);
1927                 }
1928         }
1929
1930         soft_sync = osc_over_unstable_soft_limit(cli);
1931         if (mem_tight)
1932                 mpflag = cfs_memory_pressure_get_and_set();
1933
1934         OBD_ALLOC(crattr, sizeof(*crattr));
1935         if (crattr == NULL)
1936                 GOTO(out, rc = -ENOMEM);
1937
1938         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1939         if (pga == NULL)
1940                 GOTO(out, rc = -ENOMEM);
1941
1942         OBDO_ALLOC(oa);
1943         if (oa == NULL)
1944                 GOTO(out, rc = -ENOMEM);
1945
1946         i = 0;
1947         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1948                 struct cl_page *page = oap2cl_page(oap);
1949                 if (clerq == NULL) {
1950                         clerq = cl_req_alloc(env, page, crt,
1951                                              1 /* only 1-object rpcs for now */);
1952                         if (IS_ERR(clerq))
1953                                 GOTO(out, rc = PTR_ERR(clerq));
1954                 }
1955                 if (mem_tight)
1956                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1957                 if (soft_sync)
1958                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1959                 pga[i] = &oap->oap_brw_page;
1960                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1961                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1962                        pga[i]->pg, page_index(oap->oap_page), oap,
1963                        pga[i]->flag);
1964                 i++;
1965                 cl_req_page_add(env, clerq, page);
1966         }
1967
1968         /* always get the data for the obdo for the rpc */
1969         LASSERT(clerq != NULL);
1970         crattr->cra_oa = oa;
1971         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1972
1973         rc = cl_req_prep(env, clerq);
1974         if (rc != 0) {
1975                 CERROR("cl_req_prep failed: %d\n", rc);
1976                 GOTO(out, rc);
1977         }
1978
1979         sort_brw_pages(pga, page_count);
1980         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1981                         pga, &req, crattr->cra_capa, 1, 0);
1982         if (rc != 0) {
1983                 CERROR("prep_req failed: %d\n", rc);
1984                 GOTO(out, rc);
1985         }
1986
1987         req->rq_commit_cb = brw_commit;
1988         req->rq_interpret_reply = brw_interpret;
1989
1990         if (mem_tight != 0)
1991                 req->rq_memalloc = 1;
1992
1993         /* Need to update the timestamps after the request is built in case
1994          * we race with setattr (locally or in queue at OST).  If OST gets
1995          * later setattr before earlier BRW (as determined by the request xid),
1996          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1997          * way to do this in a single call.  bug 10150 */
1998         cl_req_attr_set(env, clerq, crattr,
1999                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2000
2001         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2002
2003         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2004         aa = ptlrpc_req_async_args(req);
2005         INIT_LIST_HEAD(&aa->aa_oaps);
2006         list_splice_init(&rpc_list, &aa->aa_oaps);
2007         INIT_LIST_HEAD(&aa->aa_exts);
2008         list_splice_init(ext_list, &aa->aa_exts);
2009         aa->aa_clerq = clerq;
2010
2011         /* queued sync pages can be torn down while the pages
2012          * were between the pending list and the rpc */
2013         tmp = NULL;
2014         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2015                 /* only one oap gets a request reference */
2016                 if (tmp == NULL)
2017                         tmp = oap;
2018                 if (oap->oap_interrupted && !req->rq_intr) {
2019                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2020                                         oap, req);
2021                         ptlrpc_mark_interrupted(req);
2022                 }
2023         }
2024         if (tmp != NULL)
2025                 tmp->oap_request = ptlrpc_request_addref(req);
2026
2027         client_obd_list_lock(&cli->cl_loi_list_lock);
2028         starting_offset >>= PAGE_CACHE_SHIFT;
2029         if (cmd == OBD_BRW_READ) {
2030                 cli->cl_r_in_flight++;
2031                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2032                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2033                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2034                                       starting_offset + 1);
2035         } else {
2036                 cli->cl_w_in_flight++;
2037                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2038                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2039                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2040                                       starting_offset + 1);
2041         }
2042         client_obd_list_unlock(&cli->cl_loi_list_lock);
2043
2044         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2045                   page_count, aa, cli->cl_r_in_flight,
2046                   cli->cl_w_in_flight);
2047
2048         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2049          * see which CPU/NUMA node the majority of pages were allocated
2050          * on, and try to assign the async RPC to the CPU core
2051          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2052          *
2053          * But on the other hand, we expect that multiple ptlrpcd
2054          * threads and the initial write sponsor can run in parallel,
2055          * especially when data checksum is enabled, which is CPU-bound
2056          * operation and single ptlrpcd thread cannot process in time.
2057          * So more ptlrpcd threads sharing BRW load
2058          * (with PDL_POLICY_ROUND) seems better.
2059          */
2060         ptlrpcd_add_req(req, pol, -1);
2061         rc = 0;
2062         EXIT;
2063
2064 out:
2065         if (mem_tight != 0)
2066                 cfs_memory_pressure_restore(mpflag);
2067
2068         if (crattr != NULL) {
2069                 capa_put(crattr->cra_capa);
2070                 OBD_FREE(crattr, sizeof(*crattr));
2071         }
2072
2073         if (rc != 0) {
2074                 LASSERT(req == NULL);
2075
2076                 if (oa)
2077                         OBDO_FREE(oa);
2078                 if (pga)
2079                         OBD_FREE(pga, sizeof(*pga) * page_count);
2080                 /* this should happen rarely and is pretty bad, it makes the
2081                  * pending list not follow the dirty order */
2082                 while (!list_empty(ext_list)) {
2083                         ext = list_entry(ext_list->next, struct osc_extent,
2084                                          oe_link);
2085                         list_del_init(&ext->oe_link);
2086                         osc_extent_finish(env, ext, 0, rc);
2087                 }
2088                 if (clerq && !IS_ERR(clerq))
2089                         cl_req_completion(env, clerq, rc);
2090         }
2091         RETURN(rc);
2092 }
2093
2094 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2095                                         struct ldlm_enqueue_info *einfo)
2096 {
2097         void *data = einfo->ei_cbdata;
2098         int set = 0;
2099
2100         LASSERT(lock != NULL);
2101         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2102         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2103         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2104         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2105
2106         lock_res_and_lock(lock);
2107
2108         if (lock->l_ast_data == NULL)
2109                 lock->l_ast_data = data;
2110         if (lock->l_ast_data == data)
2111                 set = 1;
2112
2113         unlock_res_and_lock(lock);
2114
2115         return set;
2116 }
2117
2118 static int osc_set_data_with_check(struct lustre_handle *lockh,
2119                                    struct ldlm_enqueue_info *einfo)
2120 {
2121         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2122         int set = 0;
2123
2124         if (lock != NULL) {
2125                 set = osc_set_lock_data_with_check(lock, einfo);
2126                 LDLM_LOCK_PUT(lock);
2127         } else
2128                 CERROR("lockh %p, data %p - client evicted?\n",
2129                        lockh, einfo->ei_cbdata);
2130         return set;
2131 }
2132
2133 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2134                              ldlm_iterator_t replace, void *data)
2135 {
2136         struct ldlm_res_id res_id;
2137         struct obd_device *obd = class_exp2obd(exp);
2138
2139         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2140         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2141         return 0;
2142 }
2143
2144 /* find any ldlm lock of the inode in osc
2145  * return 0    not find
2146  *        1    find one
2147  *      < 0    error */
2148 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2149                            ldlm_iterator_t replace, void *data)
2150 {
2151         struct ldlm_res_id res_id;
2152         struct obd_device *obd = class_exp2obd(exp);
2153         int rc = 0;
2154
2155         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2156         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2157         if (rc == LDLM_ITER_STOP)
2158                 return(1);
2159         if (rc == LDLM_ITER_CONTINUE)
2160                 return(0);
2161         return(rc);
2162 }
2163
2164 static int osc_enqueue_fini(struct ptlrpc_request *req,
2165                             osc_enqueue_upcall_f upcall, void *cookie,
2166                             struct lustre_handle *lockh, ldlm_mode_t mode,
2167                             __u64 *flags, int agl, int errcode)
2168 {
2169         bool intent = *flags & LDLM_FL_HAS_INTENT;
2170         int rc;
2171         ENTRY;
2172
2173         /* The request was created before ldlm_cli_enqueue call. */
2174         if (intent && errcode == ELDLM_LOCK_ABORTED) {
2175                 struct ldlm_reply *rep;
2176
2177                 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2178                 LASSERT(rep != NULL);
2179
2180                 rep->lock_policy_res1 =
2181                         ptlrpc_status_ntoh(rep->lock_policy_res1);
2182                 if (rep->lock_policy_res1)
2183                         errcode = rep->lock_policy_res1;
2184                 if (!agl)
2185                         *flags |= LDLM_FL_LVB_READY;
2186         } else if (errcode == ELDLM_OK) {
2187                 *flags |= LDLM_FL_LVB_READY;
2188         }
2189
2190         /* Call the update callback. */
2191         rc = (*upcall)(cookie, lockh, errcode);
2192
2193         /* release the reference taken in ldlm_cli_enqueue() */
2194         if (errcode == ELDLM_LOCK_MATCHED)
2195                 errcode = ELDLM_OK;
2196         if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2197                 ldlm_lock_decref(lockh, mode);
2198
2199         RETURN(rc);
2200 }
2201
2202 static int osc_enqueue_interpret(const struct lu_env *env,
2203                                  struct ptlrpc_request *req,
2204                                  struct osc_enqueue_args *aa, int rc)
2205 {
2206         struct ldlm_lock *lock;
2207         struct lustre_handle *lockh = &aa->oa_lockh;
2208         ldlm_mode_t mode = aa->oa_mode;
2209         struct ost_lvb *lvb = aa->oa_lvb;
2210         __u32 lvb_len = sizeof(*lvb);
2211         __u64 flags = 0;
2212
2213         ENTRY;
2214
2215         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2216          * be valid. */
2217         lock = ldlm_handle2lock(lockh);
2218         LASSERTF(lock != NULL,
2219                  "lockh "LPX64", req %p, aa %p - client evicted?\n",
2220                  lockh->cookie, req, aa);
2221
2222         /* Take an additional reference so that a blocking AST that
2223          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2224          * to arrive after an upcall has been executed by
2225          * osc_enqueue_fini(). */
2226         ldlm_lock_addref(lockh, mode);
2227
2228         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2229         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2230
2231         /* Let CP AST to grant the lock first. */
2232         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2233
2234         if (aa->oa_agl) {
2235                 LASSERT(aa->oa_lvb == NULL);
2236                 LASSERT(aa->oa_flags == NULL);
2237                 aa->oa_flags = &flags;
2238         }
2239
2240         /* Complete obtaining the lock procedure. */
2241         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2242                                    aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2243                                    lockh, rc);
2244         /* Complete osc stuff. */
2245         rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2246                               aa->oa_flags, aa->oa_agl, rc);
2247
2248         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2249
2250         ldlm_lock_decref(lockh, mode);
2251         LDLM_LOCK_PUT(lock);
2252         RETURN(rc);
2253 }
2254
2255 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2256
2257 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2258  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2259  * other synchronous requests, however keeping some locks and trying to obtain
2260  * others may take a considerable amount of time in a case of ost failure; and
2261  * when other sync requests do not get released lock from a client, the client
2262  * is evicted from the cluster -- such scenarious make the life difficult, so
2263  * release locks just after they are obtained. */
2264 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2265                      __u64 *flags, ldlm_policy_data_t *policy,
2266                      struct ost_lvb *lvb, int kms_valid,
2267                      osc_enqueue_upcall_f upcall, void *cookie,
2268                      struct ldlm_enqueue_info *einfo,
2269                      struct ptlrpc_request_set *rqset, int async, int agl)
2270 {
2271         struct obd_device *obd = exp->exp_obd;
2272         struct lustre_handle lockh = { 0 };
2273         struct ptlrpc_request *req = NULL;
2274         int intent = *flags & LDLM_FL_HAS_INTENT;
2275         __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
2276         ldlm_mode_t mode;
2277         int rc;
2278         ENTRY;
2279
2280         /* Filesystem lock extents are extended to page boundaries so that
2281          * dealing with the page cache is a little smoother.  */
2282         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2283         policy->l_extent.end |= ~CFS_PAGE_MASK;
2284
2285         /*
2286          * kms is not valid when either object is completely fresh (so that no
2287          * locks are cached), or object was evicted. In the latter case cached
2288          * lock cannot be used, because it would prime inode state with
2289          * potentially stale LVB.
2290          */
2291         if (!kms_valid)
2292                 goto no_match;
2293
2294         /* Next, search for already existing extent locks that will cover us */
2295         /* If we're trying to read, we also search for an existing PW lock.  The
2296          * VFS and page cache already protect us locally, so lots of readers/
2297          * writers can share a single PW lock.
2298          *
2299          * There are problems with conversion deadlocks, so instead of
2300          * converting a read lock to a write lock, we'll just enqueue a new
2301          * one.
2302          *
2303          * At some point we should cancel the read lock instead of making them
2304          * send us a blocking callback, but there are problems with canceling
2305          * locks out from other users right now, too. */
2306         mode = einfo->ei_mode;
2307         if (einfo->ei_mode == LCK_PR)
2308                 mode |= LCK_PW;
2309         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2310                                einfo->ei_type, policy, mode, &lockh, 0);
2311         if (mode) {
2312                 struct ldlm_lock *matched;
2313
2314                 if (*flags & LDLM_FL_TEST_LOCK)
2315                         RETURN(ELDLM_OK);
2316
2317                 matched = ldlm_handle2lock(&lockh);
2318                 if (agl) {
2319                         /* AGL enqueues DLM locks speculatively. Therefore if
2320                          * it already exists a DLM lock, it wll just inform the
2321                          * caller to cancel the AGL process for this stripe. */
2322                         ldlm_lock_decref(&lockh, mode);
2323                         LDLM_LOCK_PUT(matched);
2324                         RETURN(-ECANCELED);
2325                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2326                         *flags |= LDLM_FL_LVB_READY;
2327
2328                         /* We already have a lock, and it's referenced. */
2329                         (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2330
2331                         ldlm_lock_decref(&lockh, mode);
2332                         LDLM_LOCK_PUT(matched);
2333                         RETURN(ELDLM_OK);
2334                 } else {
2335                         ldlm_lock_decref(&lockh, mode);
2336                         LDLM_LOCK_PUT(matched);
2337                 }
2338         }
2339
2340 no_match:
2341         if (*flags & LDLM_FL_TEST_LOCK)
2342                 RETURN(-ENOLCK);
2343
2344         if (intent) {
2345                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2346                                            &RQF_LDLM_ENQUEUE_LVB);
2347                 if (req == NULL)
2348                         RETURN(-ENOMEM);
2349
2350                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2351                 if (rc < 0) {
2352                         ptlrpc_request_free(req);
2353                         RETURN(rc);
2354                 }
2355
2356                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2357                                      sizeof *lvb);
2358                 ptlrpc_request_set_replen(req);
2359         }
2360
2361         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2362         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2363
2364         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2365                               sizeof(*lvb), LVB_T_OST, &lockh, async);
2366         if (async) {
2367                 if (!rc) {
2368                         struct osc_enqueue_args *aa;
2369                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2370                         aa = ptlrpc_req_async_args(req);
2371                         aa->oa_exp    = exp;
2372                         aa->oa_mode   = einfo->ei_mode;
2373                         aa->oa_type   = einfo->ei_type;
2374                         lustre_handle_copy(&aa->oa_lockh, &lockh);
2375                         aa->oa_upcall = upcall;
2376                         aa->oa_cookie = cookie;
2377                         aa->oa_agl    = !!agl;
2378                         if (!agl) {
2379                                 aa->oa_flags  = flags;
2380                                 aa->oa_lvb    = lvb;
2381                         } else {
2382                                 /* AGL is essentially to enqueue an DLM lock
2383                                  * in advance, so we don't care about the
2384                                  * result of AGL enqueue. */
2385                                 aa->oa_lvb    = NULL;
2386                                 aa->oa_flags  = NULL;
2387                         }
2388
2389                         req->rq_interpret_reply =
2390                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2391                         if (rqset == PTLRPCD_SET)
2392                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2393                         else
2394                                 ptlrpc_set_add_req(rqset, req);
2395                 } else if (intent) {
2396                         ptlrpc_req_finished(req);
2397                 }
2398                 RETURN(rc);
2399         }
2400
2401         rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2402                               flags, agl, rc);
2403         if (intent)
2404                 ptlrpc_req_finished(req);
2405
2406         RETURN(rc);
2407 }
2408
2409 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2410                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2411                    __u64 *flags, void *data, struct lustre_handle *lockh,
2412                    int unref)
2413 {
2414         struct obd_device *obd = exp->exp_obd;
2415         __u64 lflags = *flags;
2416         ldlm_mode_t rc;
2417         ENTRY;
2418
2419         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2420                 RETURN(-EIO);
2421
2422         /* Filesystem lock extents are extended to page boundaries so that
2423          * dealing with the page cache is a little smoother */
2424         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2425         policy->l_extent.end |= ~CFS_PAGE_MASK;
2426
2427         /* Next, search for already existing extent locks that will cover us */
2428         /* If we're trying to read, we also search for an existing PW lock.  The
2429          * VFS and page cache already protect us locally, so lots of readers/
2430          * writers can share a single PW lock. */
2431         rc = mode;
2432         if (mode == LCK_PR)
2433                 rc |= LCK_PW;
2434         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2435                              res_id, type, policy, rc, lockh, unref);
2436         if (rc) {
2437                 if (data != NULL) {
2438                         if (!osc_set_data_with_check(lockh, data)) {
2439                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2440                                         ldlm_lock_decref(lockh, rc);
2441                                 RETURN(0);
2442                         }
2443                 }
2444                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2445                         ldlm_lock_addref(lockh, LCK_PR);
2446                         ldlm_lock_decref(lockh, LCK_PW);
2447                 }
2448                 RETURN(rc);
2449         }
2450         RETURN(rc);
2451 }
2452
2453 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2454 {
2455         ENTRY;
2456
2457         if (unlikely(mode == LCK_GROUP))
2458                 ldlm_lock_decref_and_cancel(lockh, mode);
2459         else
2460                 ldlm_lock_decref(lockh, mode);
2461
2462         RETURN(0);
2463 }
2464
2465 static int osc_statfs_interpret(const struct lu_env *env,
2466                                 struct ptlrpc_request *req,
2467                                 struct osc_async_args *aa, int rc)
2468 {
2469         struct obd_statfs *msfs;
2470         ENTRY;
2471
2472         if (rc == -EBADR)
2473                 /* The request has in fact never been sent
2474                  * due to issues at a higher level (LOV).
2475                  * Exit immediately since the caller is
2476                  * aware of the problem and takes care
2477                  * of the clean up */
2478                  RETURN(rc);
2479
2480         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2481             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2482                 GOTO(out, rc = 0);
2483
2484         if (rc != 0)
2485                 GOTO(out, rc);
2486
2487         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2488         if (msfs == NULL) {
2489                 GOTO(out, rc = -EPROTO);
2490         }
2491
2492         *aa->aa_oi->oi_osfs = *msfs;
2493 out:
2494         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2495         RETURN(rc);
2496 }
2497
2498 static int osc_statfs_async(struct obd_export *exp,
2499                             struct obd_info *oinfo, __u64 max_age,
2500                             struct ptlrpc_request_set *rqset)
2501 {
2502         struct obd_device     *obd = class_exp2obd(exp);
2503         struct ptlrpc_request *req;
2504         struct osc_async_args *aa;
2505         int                    rc;
2506         ENTRY;
2507
2508         /* We could possibly pass max_age in the request (as an absolute
2509          * timestamp or a "seconds.usec ago") so the target can avoid doing
2510          * extra calls into the filesystem if that isn't necessary (e.g.
2511          * during mount that would help a bit).  Having relative timestamps
2512          * is not so great if request processing is slow, while absolute
2513          * timestamps are not ideal because they need time synchronization. */
2514         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2515         if (req == NULL)
2516                 RETURN(-ENOMEM);
2517
2518         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2519         if (rc) {
2520                 ptlrpc_request_free(req);
2521                 RETURN(rc);
2522         }
2523         ptlrpc_request_set_replen(req);
2524         req->rq_request_portal = OST_CREATE_PORTAL;
2525         ptlrpc_at_set_req_timeout(req);
2526
2527         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2528                 /* procfs requests not want stat in wait for avoid deadlock */
2529                 req->rq_no_resend = 1;
2530                 req->rq_no_delay = 1;
2531         }
2532
2533         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2534         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2535         aa = ptlrpc_req_async_args(req);
2536         aa->aa_oi = oinfo;
2537
2538         ptlrpc_set_add_req(rqset, req);
2539         RETURN(0);
2540 }
2541
2542 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2543                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2544 {
2545         struct obd_device     *obd = class_exp2obd(exp);
2546         struct obd_statfs     *msfs;
2547         struct ptlrpc_request *req;
2548         struct obd_import     *imp = NULL;
2549         int rc;
2550         ENTRY;
2551
2552         /*Since the request might also come from lprocfs, so we need
2553          *sync this with client_disconnect_export Bug15684*/
2554         down_read(&obd->u.cli.cl_sem);
2555         if (obd->u.cli.cl_import)
2556                 imp = class_import_get(obd->u.cli.cl_import);
2557         up_read(&obd->u.cli.cl_sem);
2558         if (!imp)
2559                 RETURN(-ENODEV);
2560
2561         /* We could possibly pass max_age in the request (as an absolute
2562          * timestamp or a "seconds.usec ago") so the target can avoid doing
2563          * extra calls into the filesystem if that isn't necessary (e.g.
2564          * during mount that would help a bit).  Having relative timestamps
2565          * is not so great if request processing is slow, while absolute
2566          * timestamps are not ideal because they need time synchronization. */
2567         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2568
2569         class_import_put(imp);
2570
2571         if (req == NULL)
2572                 RETURN(-ENOMEM);
2573
2574         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2575         if (rc) {
2576                 ptlrpc_request_free(req);
2577                 RETURN(rc);
2578         }
2579         ptlrpc_request_set_replen(req);
2580         req->rq_request_portal = OST_CREATE_PORTAL;
2581         ptlrpc_at_set_req_timeout(req);
2582
2583         if (flags & OBD_STATFS_NODELAY) {
2584                 /* procfs requests not want stat in wait for avoid deadlock */
2585                 req->rq_no_resend = 1;
2586                 req->rq_no_delay = 1;
2587         }
2588
2589         rc = ptlrpc_queue_wait(req);
2590         if (rc)
2591                 GOTO(out, rc);
2592
2593         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2594         if (msfs == NULL) {
2595                 GOTO(out, rc = -EPROTO);
2596         }
2597
2598         *osfs = *msfs;
2599
2600         EXIT;
2601  out:
2602         ptlrpc_req_finished(req);
2603         return rc;
2604 }
2605
2606 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2607                          void *karg, void *uarg)
2608 {
2609         struct obd_device *obd = exp->exp_obd;
2610         struct obd_ioctl_data *data = karg;
2611         int err = 0;
2612         ENTRY;
2613
2614         if (!try_module_get(THIS_MODULE)) {
2615                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2616                        module_name(THIS_MODULE));
2617                 return -EINVAL;
2618         }
2619         switch (cmd) {
2620         case OBD_IOC_CLIENT_RECOVER:
2621                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2622                                             data->ioc_inlbuf1, 0);
2623                 if (err > 0)
2624                         err = 0;
2625                 GOTO(out, err);
2626         case IOC_OSC_SET_ACTIVE:
2627                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2628                                                data->ioc_offset);
2629                 GOTO(out, err);
2630         case OBD_IOC_POLL_QUOTACHECK:
2631                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2632                 GOTO(out, err);
2633         case OBD_IOC_PING_TARGET:
2634                 err = ptlrpc_obd_ping(obd);
2635                 GOTO(out, err);
2636         default:
2637                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2638                        cmd, current_comm());
2639                 GOTO(out, err = -ENOTTY);
2640         }
2641 out:
2642         module_put(THIS_MODULE);
2643         return err;
2644 }
2645
2646 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2647                         obd_count keylen, void *key, __u32 *vallen, void *val,
2648                         struct lov_stripe_md *lsm)
2649 {
2650         ENTRY;
2651         if (!vallen || !val)
2652                 RETURN(-EFAULT);
2653
2654         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2655                 __u32 *stripe = val;
2656                 *vallen = sizeof(*stripe);
2657                 *stripe = 0;
2658                 RETURN(0);
2659         } else if (KEY_IS(KEY_LAST_ID)) {
2660                 struct ptlrpc_request *req;
2661                 obd_id                *reply;
2662                 char                  *tmp;
2663                 int                    rc;
2664
2665                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2666                                            &RQF_OST_GET_INFO_LAST_ID);
2667                 if (req == NULL)
2668                         RETURN(-ENOMEM);
2669
2670                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2671                                      RCL_CLIENT, keylen);
2672                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2673                 if (rc) {
2674                         ptlrpc_request_free(req);
2675                         RETURN(rc);
2676                 }
2677
2678                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2679                 memcpy(tmp, key, keylen);
2680
2681                 req->rq_no_delay = req->rq_no_resend = 1;
2682                 ptlrpc_request_set_replen(req);
2683                 rc = ptlrpc_queue_wait(req);
2684                 if (rc)
2685                         GOTO(out, rc);
2686
2687                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2688                 if (reply == NULL)
2689                         GOTO(out, rc = -EPROTO);
2690
2691                 *((obd_id *)val) = *reply;
2692         out:
2693                 ptlrpc_req_finished(req);
2694                 RETURN(rc);
2695         } else if (KEY_IS(KEY_FIEMAP)) {
2696                 struct ll_fiemap_info_key *fm_key =
2697                                 (struct ll_fiemap_info_key *)key;
2698                 struct ldlm_res_id       res_id;
2699                 ldlm_policy_data_t       policy;
2700                 struct lustre_handle     lockh;
2701                 ldlm_mode_t              mode = 0;
2702                 struct ptlrpc_request   *req;
2703                 struct ll_user_fiemap   *reply;
2704                 char                    *tmp;
2705                 int                      rc;
2706
2707                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2708                         goto skip_locking;
2709
2710                 policy.l_extent.start = fm_key->fiemap.fm_start &
2711                                                 CFS_PAGE_MASK;
2712
2713                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2714                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2715                         policy.l_extent.end = OBD_OBJECT_EOF;
2716                 else
2717                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2718                                 fm_key->fiemap.fm_length +
2719                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2720
2721                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2722                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2723                                        LDLM_FL_BLOCK_GRANTED |
2724                                        LDLM_FL_LVB_READY,
2725                                        &res_id, LDLM_EXTENT, &policy,
2726                                        LCK_PR | LCK_PW, &lockh, 0);
2727                 if (mode) { /* lock is cached on client */
2728                         if (mode != LCK_PR) {
2729                                 ldlm_lock_addref(&lockh, LCK_PR);
2730                                 ldlm_lock_decref(&lockh, LCK_PW);
2731                         }
2732                 } else { /* no cached lock, needs acquire lock on server side */
2733                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2734                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2735                 }
2736
2737 skip_locking:
2738                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2739                                            &RQF_OST_GET_INFO_FIEMAP);
2740                 if (req == NULL)
2741                         GOTO(drop_lock, rc = -ENOMEM);
2742
2743                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2744                                      RCL_CLIENT, keylen);
2745                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2746                                      RCL_CLIENT, *vallen);
2747                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2748                                      RCL_SERVER, *vallen);
2749
2750                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2751                 if (rc) {
2752                         ptlrpc_request_free(req);
2753                         GOTO(drop_lock, rc);
2754                 }
2755
2756                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2757                 memcpy(tmp, key, keylen);
2758                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2759                 memcpy(tmp, val, *vallen);
2760
2761                 ptlrpc_request_set_replen(req);
2762                 rc = ptlrpc_queue_wait(req);
2763                 if (rc)
2764                         GOTO(fini_req, rc);
2765
2766                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2767                 if (reply == NULL)
2768                         GOTO(fini_req, rc = -EPROTO);
2769
2770                 memcpy(val, reply, *vallen);
2771 fini_req:
2772                 ptlrpc_req_finished(req);
2773 drop_lock:
2774                 if (mode)
2775                         ldlm_lock_decref(&lockh, LCK_PR);
2776                 RETURN(rc);
2777         }
2778
2779         RETURN(-EINVAL);
2780 }
2781
2782 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2783                               obd_count keylen, void *key, obd_count vallen,
2784                               void *val, struct ptlrpc_request_set *set)
2785 {
2786         struct ptlrpc_request *req;
2787         struct obd_device     *obd = exp->exp_obd;
2788         struct obd_import     *imp = class_exp2cliimp(exp);
2789         char                  *tmp;
2790         int                    rc;
2791         ENTRY;
2792
2793         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2794
2795         if (KEY_IS(KEY_CHECKSUM)) {
2796                 if (vallen != sizeof(int))
2797                         RETURN(-EINVAL);
2798                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2799                 RETURN(0);
2800         }
2801
2802         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2803                 sptlrpc_conf_client_adapt(obd);
2804                 RETURN(0);
2805         }
2806
2807         if (KEY_IS(KEY_FLUSH_CTX)) {
2808                 sptlrpc_import_flush_my_ctx(imp);
2809                 RETURN(0);
2810         }
2811
2812         if (KEY_IS(KEY_CACHE_SET)) {
2813                 struct client_obd *cli = &obd->u.cli;
2814
2815                 LASSERT(cli->cl_cache == NULL); /* only once */
2816                 cli->cl_cache = (struct cl_client_cache *)val;
2817                 atomic_inc(&cli->cl_cache->ccc_users);
2818                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2819
2820                 /* add this osc into entity list */
2821                 LASSERT(list_empty(&cli->cl_lru_osc));
2822                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2823                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2824                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2825
2826                 RETURN(0);
2827         }
2828
2829         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2830                 struct client_obd *cli = &obd->u.cli;
2831                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2832                 long target = *(long *)val;
2833
2834                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2835                 *(long *)val -= nr;
2836                 RETURN(0);
2837         }
2838
2839         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2840                 RETURN(-EINVAL);
2841
2842         /* We pass all other commands directly to OST. Since nobody calls osc
2843            methods directly and everybody is supposed to go through LOV, we
2844            assume lov checked invalid values for us.
2845            The only recognised values so far are evict_by_nid and mds_conn.
2846            Even if something bad goes through, we'd get a -EINVAL from OST
2847            anyway. */
2848
2849         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2850                                                 &RQF_OST_SET_GRANT_INFO :
2851                                                 &RQF_OBD_SET_INFO);
2852         if (req == NULL)
2853                 RETURN(-ENOMEM);
2854
2855         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2856                              RCL_CLIENT, keylen);
2857         if (!KEY_IS(KEY_GRANT_SHRINK))
2858                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2859                                      RCL_CLIENT, vallen);
2860         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2861         if (rc) {
2862                 ptlrpc_request_free(req);
2863                 RETURN(rc);
2864         }
2865
2866         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2867         memcpy(tmp, key, keylen);
2868         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2869                                                         &RMF_OST_BODY :
2870                                                         &RMF_SETINFO_VAL);
2871         memcpy(tmp, val, vallen);
2872
2873         if (KEY_IS(KEY_GRANT_SHRINK)) {
2874                 struct osc_grant_args *aa;
2875                 struct obdo *oa;
2876
2877                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2878                 aa = ptlrpc_req_async_args(req);
2879                 OBDO_ALLOC(oa);
2880                 if (!oa) {
2881                         ptlrpc_req_finished(req);
2882                         RETURN(-ENOMEM);
2883                 }
2884                 *oa = ((struct ost_body *)val)->oa;
2885                 aa->aa_oa = oa;
2886                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2887         }
2888
2889         ptlrpc_request_set_replen(req);
2890         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2891                 LASSERT(set != NULL);
2892                 ptlrpc_set_add_req(set, req);
2893                 ptlrpc_check_set(NULL, set);
2894         } else
2895                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2896
2897         RETURN(0);
2898 }
2899
2900 static int osc_reconnect(const struct lu_env *env,
2901                          struct obd_export *exp, struct obd_device *obd,
2902                          struct obd_uuid *cluuid,
2903                          struct obd_connect_data *data,
2904                          void *localdata)
2905 {
2906         struct client_obd *cli = &obd->u.cli;
2907
2908         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2909                 long lost_grant;
2910
2911                 client_obd_list_lock(&cli->cl_loi_list_lock);
2912                 data->ocd_grant = (cli->cl_avail_grant +
2913                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2914                                   2 * cli_brw_size(obd);
2915                 lost_grant = cli->cl_lost_grant;
2916                 cli->cl_lost_grant = 0;
2917                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2918
2919                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2920                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2921                        data->ocd_version, data->ocd_grant, lost_grant);
2922         }
2923
2924         RETURN(0);
2925 }
2926
2927 static int osc_disconnect(struct obd_export *exp)
2928 {
2929         struct obd_device *obd = class_exp2obd(exp);
2930         int rc;
2931
2932         rc = client_disconnect_export(exp);
2933         /**
2934          * Initially we put del_shrink_grant before disconnect_export, but it
2935          * causes the following problem if setup (connect) and cleanup
2936          * (disconnect) are tangled together.
2937          *      connect p1                     disconnect p2
2938          *   ptlrpc_connect_import
2939          *     ...............               class_manual_cleanup
2940          *                                     osc_disconnect
2941          *                                     del_shrink_grant
2942          *   ptlrpc_connect_interrupt
2943          *     init_grant_shrink
2944          *   add this client to shrink list
2945          *                                      cleanup_osc
2946          * Bang! pinger trigger the shrink.
2947          * So the osc should be disconnected from the shrink list, after we
2948          * are sure the import has been destroyed. BUG18662
2949          */
2950         if (obd->u.cli.cl_import == NULL)
2951                 osc_del_shrink_grant(&obd->u.cli);
2952         return rc;
2953 }
2954
2955 static int osc_import_event(struct obd_device *obd,
2956                             struct obd_import *imp,
2957                             enum obd_import_event event)
2958 {
2959         struct client_obd *cli;
2960         int rc = 0;
2961
2962         ENTRY;
2963         LASSERT(imp->imp_obd == obd);
2964
2965         switch (event) {
2966         case IMP_EVENT_DISCON: {
2967                 cli = &obd->u.cli;
2968                 client_obd_list_lock(&cli->cl_loi_list_lock);
2969                 cli->cl_avail_grant = 0;
2970                 cli->cl_lost_grant = 0;
2971                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2972                 break;
2973         }
2974         case IMP_EVENT_INACTIVE: {
2975                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2976                 break;
2977         }
2978         case IMP_EVENT_INVALIDATE: {
2979                 struct ldlm_namespace *ns = obd->obd_namespace;
2980                 struct lu_env         *env;
2981                 int                    refcheck;
2982
2983                 env = cl_env_get(&refcheck);
2984                 if (!IS_ERR(env)) {
2985                         /* Reset grants */
2986                         cli = &obd->u.cli;
2987                         /* all pages go to failing rpcs due to the invalid
2988                          * import */
2989                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2990
2991                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
2992                         cl_env_put(env, &refcheck);
2993                 } else
2994                         rc = PTR_ERR(env);
2995                 break;
2996         }
2997         case IMP_EVENT_ACTIVE: {
2998                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
2999                 break;
3000         }
3001         case IMP_EVENT_OCD: {
3002                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3003
3004                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3005                         osc_init_grant(&obd->u.cli, ocd);
3006
3007                 /* See bug 7198 */
3008                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3009                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3010
3011                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3012                 break;
3013         }
3014         case IMP_EVENT_DEACTIVATE: {
3015                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3016                 break;
3017         }
3018         case IMP_EVENT_ACTIVATE: {
3019                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3020                 break;
3021         }
3022         default:
3023                 CERROR("Unknown import event %d\n", event);
3024                 LBUG();
3025         }
3026         RETURN(rc);
3027 }
3028
3029 /**
3030  * Determine whether the lock can be canceled before replaying the lock
3031  * during recovery, see bug16774 for detailed information.
3032  *
3033  * \retval zero the lock can't be canceled
3034  * \retval other ok to cancel
3035  */
3036 static int osc_cancel_weight(struct ldlm_lock *lock)
3037 {
3038         /*
3039          * Cancel all unused and granted extent lock.
3040          */
3041         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3042             lock->l_granted_mode == lock->l_req_mode &&
3043             osc_ldlm_weigh_ast(lock) == 0)
3044                 RETURN(1);
3045
3046         RETURN(0);
3047 }
3048
3049 static int brw_queue_work(const struct lu_env *env, void *data)
3050 {
3051         struct client_obd *cli = data;
3052
3053         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3054
3055         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3056         RETURN(0);
3057 }
3058
3059 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3060 {
3061         struct client_obd *cli = &obd->u.cli;
3062         struct obd_type   *type;
3063         void              *handler;
3064         int                rc;
3065         ENTRY;
3066
3067         rc = ptlrpcd_addref();
3068         if (rc)
3069                 RETURN(rc);
3070
3071         rc = client_obd_setup(obd, lcfg);
3072         if (rc)
3073                 GOTO(out_ptlrpcd, rc);
3074
3075         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3076         if (IS_ERR(handler))
3077                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3078         cli->cl_writeback_work = handler;
3079
3080         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3081         if (IS_ERR(handler))
3082                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3083         cli->cl_lru_work = handler;
3084
3085         rc = osc_quota_setup(obd);
3086         if (rc)
3087                 GOTO(out_ptlrpcd_work, rc);
3088
3089         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3090
3091 #ifdef LPROCFS
3092         obd->obd_vars = lprocfs_osc_obd_vars;
3093 #endif
3094         /* If this is true then both client (osc) and server (osp) are on the
3095          * same node. The osp layer if loaded first will register the osc proc
3096          * directory. In that case this obd_device will be attached its proc
3097          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3098         type = class_search_type(LUSTRE_OSP_NAME);
3099         if (type && type->typ_procsym) {
3100                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3101                                                            type->typ_procsym,
3102                                                            obd->obd_vars, obd);
3103                 if (IS_ERR(obd->obd_proc_entry)) {
3104                         rc = PTR_ERR(obd->obd_proc_entry);
3105                         CERROR("error %d setting up lprocfs for %s\n", rc,
3106                                obd->obd_name);
3107                         obd->obd_proc_entry = NULL;
3108                 }
3109         } else {
3110                 rc = lprocfs_obd_setup(obd);
3111         }
3112
3113         /* If the basic OSC proc tree construction succeeded then
3114          * lets do the rest. */
3115         if (rc == 0) {
3116                 lproc_osc_attach_seqstat(obd);
3117                 sptlrpc_lprocfs_cliobd_attach(obd);
3118                 ptlrpc_lprocfs_register_obd(obd);
3119         }
3120
3121         /* We need to allocate a few requests more, because
3122          * brw_interpret tries to create new requests before freeing
3123          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3124          * reserved, but I'm afraid that might be too much wasted RAM
3125          * in fact, so 2 is just my guess and still should work. */
3126         cli->cl_import->imp_rq_pool =
3127                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3128                                     OST_MAXREQSIZE,
3129                                     ptlrpc_add_rqs_to_pool);
3130
3131         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3132         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3133         RETURN(0);
3134
3135 out_ptlrpcd_work:
3136         if (cli->cl_writeback_work != NULL) {
3137                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3138                 cli->cl_writeback_work = NULL;
3139         }
3140         if (cli->cl_lru_work != NULL) {
3141                 ptlrpcd_destroy_work(cli->cl_lru_work);
3142                 cli->cl_lru_work = NULL;
3143         }
3144 out_client_setup:
3145         client_obd_cleanup(obd);
3146 out_ptlrpcd:
3147         ptlrpcd_decref();
3148         RETURN(rc);
3149 }
3150
3151 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3152 {
3153         int rc = 0;
3154         ENTRY;
3155
3156         switch (stage) {
3157         case OBD_CLEANUP_EARLY: {
3158                 struct obd_import *imp;
3159                 imp = obd->u.cli.cl_import;
3160                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3161                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3162                 ptlrpc_deactivate_import(imp);
3163                 spin_lock(&imp->imp_lock);
3164                 imp->imp_pingable = 0;
3165                 spin_unlock(&imp->imp_lock);
3166                 break;
3167         }
3168         case OBD_CLEANUP_EXPORTS: {
3169                 struct client_obd *cli = &obd->u.cli;
3170                 /* LU-464
3171                  * for echo client, export may be on zombie list, wait for
3172                  * zombie thread to cull it, because cli.cl_import will be
3173                  * cleared in client_disconnect_export():
3174                  *   class_export_destroy() -> obd_cleanup() ->
3175                  *   echo_device_free() -> echo_client_cleanup() ->
3176                  *   obd_disconnect() -> osc_disconnect() ->
3177                  *   client_disconnect_export()
3178                  */
3179                 obd_zombie_barrier();
3180                 if (cli->cl_writeback_work) {
3181                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3182                         cli->cl_writeback_work = NULL;
3183                 }
3184                 if (cli->cl_lru_work) {
3185                         ptlrpcd_destroy_work(cli->cl_lru_work);
3186                         cli->cl_lru_work = NULL;
3187                 }
3188                 obd_cleanup_client_import(obd);
3189                 ptlrpc_lprocfs_unregister_obd(obd);
3190                 lprocfs_obd_cleanup(obd);
3191                 break;
3192                 }
3193         }
3194         RETURN(rc);
3195 }
3196
3197 int osc_cleanup(struct obd_device *obd)
3198 {
3199         struct client_obd *cli = &obd->u.cli;
3200         int rc;
3201
3202         ENTRY;
3203
3204         /* lru cleanup */
3205         if (cli->cl_cache != NULL) {
3206                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3207                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3208                 list_del_init(&cli->cl_lru_osc);
3209                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3210                 cli->cl_lru_left = NULL;
3211                 atomic_dec(&cli->cl_cache->ccc_users);
3212                 cli->cl_cache = NULL;
3213         }
3214
3215         /* free memory of osc quota cache */
3216         osc_quota_cleanup(obd);
3217
3218         rc = client_obd_cleanup(obd);
3219
3220         ptlrpcd_decref();
3221         RETURN(rc);
3222 }
3223
3224 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3225 {
3226         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3227         return rc > 0 ? 0: rc;
3228 }
3229
3230 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3231 {
3232         return osc_process_config_base(obd, buf);
3233 }
3234
3235 struct obd_ops osc_obd_ops = {
3236         .o_owner                = THIS_MODULE,
3237         .o_setup                = osc_setup,
3238         .o_precleanup           = osc_precleanup,
3239         .o_cleanup              = osc_cleanup,
3240         .o_add_conn             = client_import_add_conn,
3241         .o_del_conn             = client_import_del_conn,
3242         .o_connect              = client_connect_import,
3243         .o_reconnect            = osc_reconnect,
3244         .o_disconnect           = osc_disconnect,
3245         .o_statfs               = osc_statfs,
3246         .o_statfs_async         = osc_statfs_async,
3247         .o_unpackmd             = osc_unpackmd,
3248         .o_create               = osc_create,
3249         .o_destroy              = osc_destroy,
3250         .o_getattr              = osc_getattr,
3251         .o_getattr_async        = osc_getattr_async,
3252         .o_setattr              = osc_setattr,
3253         .o_setattr_async        = osc_setattr_async,
3254         .o_change_cbdata        = osc_change_cbdata,
3255         .o_find_cbdata          = osc_find_cbdata,
3256         .o_iocontrol            = osc_iocontrol,
3257         .o_get_info             = osc_get_info,
3258         .o_set_info_async       = osc_set_info_async,
3259         .o_import_event         = osc_import_event,
3260         .o_process_config       = osc_process_config,
3261         .o_quotactl             = osc_quotactl,
3262         .o_quotacheck           = osc_quotacheck,
3263 };
3264
3265 extern struct lu_kmem_descr osc_caches[];
3266 extern struct lock_class_key osc_ast_guard_class;
3267
3268 int __init osc_init(void)
3269 {
3270         bool enable_proc = true;
3271         struct obd_type *type;
3272         int rc;
3273         ENTRY;
3274
3275         /* print an address of _any_ initialized kernel symbol from this
3276          * module, to allow debugging with gdb that doesn't support data
3277          * symbols from modules.*/
3278         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3279
3280         rc = lu_kmem_init(osc_caches);
3281         if (rc)
3282                 RETURN(rc);
3283
3284         type = class_search_type(LUSTRE_OSP_NAME);
3285         if (type != NULL && type->typ_procsym != NULL)
3286                 enable_proc = false;
3287
3288         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3289                                  LUSTRE_OSC_NAME, &osc_device_type);
3290         if (rc) {
3291                 lu_kmem_fini(osc_caches);
3292                 RETURN(rc);
3293         }
3294
3295         RETURN(rc);
3296 }
3297
3298 static void /*__exit*/ osc_exit(void)
3299 {
3300         class_unregister_type(LUSTRE_OSC_NAME);
3301         lu_kmem_fini(osc_caches);
3302 }
3303
3304 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3305 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3306 MODULE_LICENSE("GPL");
3307
3308 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);