Whamcloud - gitweb
LU-2675 osc: remove obsolete llog handling
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #include <lustre_dlm.h>
42 #include <lustre_net.h>
43 #include <lustre/lustre_user.h>
44 #include <obd_cksum.h>
45 #include <lustre_ha.h>
46 #include <lprocfs_status.h>
47 #include <lustre_ioctl.h>
48 #include <lustre_debug.h>
49 #include <lustre_param.h>
50 #include <lustre_fid.h>
51 #include <obd_class.h>
52 #include "osc_internal.h"
53 #include "osc_cl_internal.h"
54
55 struct osc_brw_async_args {
56         struct obdo              *aa_oa;
57         int                       aa_requested_nob;
58         int                       aa_nio_count;
59         obd_count                 aa_page_count;
60         int                       aa_resends;
61         struct brw_page **aa_ppga;
62         struct client_obd        *aa_cli;
63         struct list_head          aa_oaps;
64         struct list_head          aa_exts;
65         struct obd_capa  *aa_ocapa;
66         struct cl_req            *aa_clerq;
67 };
68
69 #define osc_grant_args osc_brw_async_args
70
71 struct osc_async_args {
72         struct obd_info *aa_oi;
73 };
74
75 struct osc_setattr_args {
76         struct obdo             *sa_oa;
77         obd_enqueue_update_f     sa_upcall;
78         void                    *sa_cookie;
79 };
80
81 struct osc_fsync_args {
82         struct obd_info *fa_oi;
83         obd_enqueue_update_f     fa_upcall;
84         void                    *fa_cookie;
85 };
86
87 struct osc_enqueue_args {
88         struct obd_export               *oa_exp;
89         __u64                           *oa_flags;
90         obd_enqueue_update_f             oa_upcall;
91         void                            *oa_cookie;
92         struct ost_lvb                  *oa_lvb;
93         struct lustre_handle            *oa_lockh;
94         struct ldlm_enqueue_info        *oa_ei;
95         unsigned int                     oa_agl:1;
96 };
97
98 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
99 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
100                          void *data, int rc);
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         struct obd_import *imp = class_exp2cliimp(exp);
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof(*lmm)) {
112                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
113                                exp->exp_obd->obd_name, lmm_bytes,
114                                (int)sizeof(*lmm));
115                         RETURN(-EINVAL);
116                 }
117                 /* XXX LOV_MAGIC etc check? */
118
119                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
120                         CERROR("%s: zero lmm_object_id: rc = %d\n",
121                                exp->exp_obd->obd_name, -EINVAL);
122                         RETURN(-EINVAL);
123                 }
124         }
125
126         lsm_size = lov_stripe_md_size(1);
127         if (lsmp == NULL)
128                 RETURN(lsm_size);
129
130         if (*lsmp != NULL && lmm == NULL) {
131                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
132                 OBD_FREE(*lsmp, lsm_size);
133                 *lsmp = NULL;
134                 RETURN(0);
135         }
136
137         if (*lsmp == NULL) {
138                 OBD_ALLOC(*lsmp, lsm_size);
139                 if (unlikely(*lsmp == NULL))
140                         RETURN(-ENOMEM);
141                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
142                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
143                         OBD_FREE(*lsmp, lsm_size);
144                         RETURN(-ENOMEM);
145                 }
146                 loi_init((*lsmp)->lsm_oinfo[0]);
147         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
148                 RETURN(-EBADF);
149         }
150
151         if (lmm != NULL)
152                 /* XXX zero *lsmp? */
153                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
154
155         if (imp != NULL &&
156             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
158         else
159                 (*lsmp)->lsm_maxbytes = LUSTRE_EXT3_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
189                              oinfo->oi_oa);
190         osc_pack_capa(req, body, oinfo->oi_capa);
191 }
192
193 static inline void osc_set_capa_size(struct ptlrpc_request *req,
194                                      const struct req_msg_field *field,
195                                      struct obd_capa *oc)
196 {
197         if (oc == NULL)
198                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199         else
200                 /* it is already calculated as sizeof struct obd_capa */
201                 ;
202 }
203
204 static int osc_getattr_interpret(const struct lu_env *env,
205                                  struct ptlrpc_request *req,
206                                  struct osc_async_args *aa, int rc)
207 {
208         struct ost_body *body;
209         ENTRY;
210
211         if (rc != 0)
212                 GOTO(out, rc);
213
214         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
218                                      aa->aa_oi->oi_oa, &body->oa);
219
220                 /* This should really be sent by the OST */
221                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
222                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
223         } else {
224                 CDEBUG(D_INFO, "can't unpack ost_body\n");
225                 rc = -EPROTO;
226                 aa->aa_oi->oi_oa->o_valid = 0;
227         }
228 out:
229         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
230         RETURN(rc);
231 }
232
233 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
234                              struct ptlrpc_request_set *set)
235 {
236         struct ptlrpc_request *req;
237         struct osc_async_args *aa;
238         int                    rc;
239         ENTRY;
240
241         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
242         if (req == NULL)
243                 RETURN(-ENOMEM);
244
245         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
246         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
247         if (rc) {
248                 ptlrpc_request_free(req);
249                 RETURN(rc);
250         }
251
252         osc_pack_req_body(req, oinfo);
253
254         ptlrpc_request_set_replen(req);
255         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
256
257         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
258         aa = ptlrpc_req_async_args(req);
259         aa->aa_oi = oinfo;
260
261         ptlrpc_set_add_req(set, req);
262         RETURN(0);
263 }
264
265 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
266                        struct obd_info *oinfo)
267 {
268         struct ptlrpc_request *req;
269         struct ost_body       *body;
270         int                    rc;
271         ENTRY;
272
273         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
274         if (req == NULL)
275                 RETURN(-ENOMEM);
276
277         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
278         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
279         if (rc) {
280                 ptlrpc_request_free(req);
281                 RETURN(rc);
282         }
283
284         osc_pack_req_body(req, oinfo);
285
286         ptlrpc_request_set_replen(req);
287
288         rc = ptlrpc_queue_wait(req);
289         if (rc)
290                 GOTO(out, rc);
291
292         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
293         if (body == NULL)
294                 GOTO(out, rc = -EPROTO);
295
296         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
297         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
298                              &body->oa);
299
300         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
301         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
302
303         EXIT;
304  out:
305         ptlrpc_req_finished(req);
306         return rc;
307 }
308
309 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
310                        struct obd_info *oinfo, struct obd_trans_info *oti)
311 {
312         struct ptlrpc_request *req;
313         struct ost_body       *body;
314         int                    rc;
315         ENTRY;
316
317         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
318
319         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
320         if (req == NULL)
321                 RETURN(-ENOMEM);
322
323         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
324         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
325         if (rc) {
326                 ptlrpc_request_free(req);
327                 RETURN(rc);
328         }
329
330         osc_pack_req_body(req, oinfo);
331
332         ptlrpc_request_set_replen(req);
333
334         rc = ptlrpc_queue_wait(req);
335         if (rc)
336                 GOTO(out, rc);
337
338         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
339         if (body == NULL)
340                 GOTO(out, rc = -EPROTO);
341
342         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
343                              &body->oa);
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_setattr_args *sa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
366                              &body->oa);
367 out:
368         rc = sa->sa_upcall(sa->sa_cookie, rc);
369         RETURN(rc);
370 }
371
372 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
373                            struct obd_trans_info *oti,
374                            obd_enqueue_update_f upcall, void *cookie,
375                            struct ptlrpc_request_set *rqset)
376 {
377         struct ptlrpc_request   *req;
378         struct osc_setattr_args *sa;
379         int                      rc;
380         ENTRY;
381
382         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
383         if (req == NULL)
384                 RETURN(-ENOMEM);
385
386         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
387         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
388         if (rc) {
389                 ptlrpc_request_free(req);
390                 RETURN(rc);
391         }
392
393         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
394                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
395
396         osc_pack_req_body(req, oinfo);
397
398         ptlrpc_request_set_replen(req);
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
409                 sa = ptlrpc_req_async_args(req);
410                 sa->sa_oa = oinfo->oi_oa;
411                 sa->sa_upcall = upcall;
412                 sa->sa_cookie = cookie;
413
414                 if (rqset == PTLRPCD_SET)
415                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
416                 else
417                         ptlrpc_set_add_req(rqset, req);
418         }
419
420         RETURN(0);
421 }
422
423 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
424                              struct obd_trans_info *oti,
425                              struct ptlrpc_request_set *rqset)
426 {
427         return osc_setattr_async_base(exp, oinfo, oti,
428                                       oinfo->oi_cb_up, oinfo, rqset);
429 }
430
431 int osc_real_create(struct obd_export *exp, struct obdo *oa,
432                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
433 {
434         struct ptlrpc_request *req;
435         struct ost_body       *body;
436         struct lov_stripe_md  *lsm;
437         int                    rc;
438         ENTRY;
439
440         LASSERT(oa);
441         LASSERT(ea);
442
443         lsm = *ea;
444         if (!lsm) {
445                 rc = obd_alloc_memmd(exp, &lsm);
446                 if (rc < 0)
447                         RETURN(rc);
448         }
449
450         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
451         if (req == NULL)
452                 GOTO(out, rc = -ENOMEM);
453
454         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
455         if (rc) {
456                 ptlrpc_request_free(req);
457                 GOTO(out, rc);
458         }
459
460         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
461         LASSERT(body);
462
463         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
464
465         ptlrpc_request_set_replen(req);
466
467         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
468             oa->o_flags == OBD_FL_DELORPHAN) {
469                 DEBUG_REQ(D_HA, req,
470                           "delorphan from OST integration");
471                 /* Don't resend the delorphan req */
472                 req->rq_no_resend = req->rq_no_delay = 1;
473         }
474
475         rc = ptlrpc_queue_wait(req);
476         if (rc)
477                 GOTO(out_req, rc);
478
479         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
480         if (body == NULL)
481                 GOTO(out_req, rc = -EPROTO);
482
483         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
484         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
485
486         oa->o_blksize = cli_brw_size(exp->exp_obd);
487         oa->o_valid |= OBD_MD_FLBLKSZ;
488
489         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
490          * have valid lsm_oinfo data structs, so don't go touching that.
491          * This needs to be fixed in a big way.
492          */
493         lsm->lsm_oi = oa->o_oi;
494         *ea = lsm;
495
496         if (oti != NULL) {
497                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
498                         if (oti->oti_logcookies == NULL)
499                                 oti->oti_logcookies = &oti->oti_onecookie;
500
501                         *oti->oti_logcookies = oa->o_lcookie;
502                 }
503         }
504
505         CDEBUG(D_HA, "transno: "LPD64"\n",
506                lustre_msg_get_transno(req->rq_repmsg));
507 out_req:
508         ptlrpc_req_finished(req);
509 out:
510         if (rc && !*ea)
511                 obd_free_memmd(exp, &lsm);
512         RETURN(rc);
513 }
514
515 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
516                    obd_enqueue_update_f upcall, void *cookie,
517                    struct ptlrpc_request_set *rqset)
518 {
519         struct ptlrpc_request   *req;
520         struct osc_setattr_args *sa;
521         struct ost_body         *body;
522         int                      rc;
523         ENTRY;
524
525         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
526         if (req == NULL)
527                 RETURN(-ENOMEM);
528
529         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
530         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
531         if (rc) {
532                 ptlrpc_request_free(req);
533                 RETURN(rc);
534         }
535         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
536         ptlrpc_at_set_req_timeout(req);
537
538         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
539         LASSERT(body);
540         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
541                              oinfo->oi_oa);
542         osc_pack_capa(req, body, oinfo->oi_capa);
543
544         ptlrpc_request_set_replen(req);
545
546         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
547         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
548         sa = ptlrpc_req_async_args(req);
549         sa->sa_oa     = oinfo->oi_oa;
550         sa->sa_upcall = upcall;
551         sa->sa_cookie = cookie;
552         if (rqset == PTLRPCD_SET)
553                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
554         else
555                 ptlrpc_set_add_req(rqset, req);
556
557         RETURN(0);
558 }
559
560 static int osc_sync_interpret(const struct lu_env *env,
561                               struct ptlrpc_request *req,
562                               void *arg, int rc)
563 {
564         struct osc_fsync_args *fa = arg;
565         struct ost_body *body;
566         ENTRY;
567
568         if (rc)
569                 GOTO(out, rc);
570
571         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
572         if (body == NULL) {
573                 CERROR ("can't unpack ost_body\n");
574                 GOTO(out, rc = -EPROTO);
575         }
576
577         *fa->fa_oi->oi_oa = body->oa;
578 out:
579         rc = fa->fa_upcall(fa->fa_cookie, rc);
580         RETURN(rc);
581 }
582
583 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
584                   obd_enqueue_update_f upcall, void *cookie,
585                   struct ptlrpc_request_set *rqset)
586 {
587         struct ptlrpc_request *req;
588         struct ost_body       *body;
589         struct osc_fsync_args *fa;
590         int                    rc;
591         ENTRY;
592
593         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
594         if (req == NULL)
595                 RETURN(-ENOMEM);
596
597         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
598         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
599         if (rc) {
600                 ptlrpc_request_free(req);
601                 RETURN(rc);
602         }
603
604         /* overload the size and blocks fields in the oa with start/end */
605         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
606         LASSERT(body);
607         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
608                              oinfo->oi_oa);
609         osc_pack_capa(req, body, oinfo->oi_capa);
610
611         ptlrpc_request_set_replen(req);
612         req->rq_interpret_reply = osc_sync_interpret;
613
614         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
615         fa = ptlrpc_req_async_args(req);
616         fa->fa_oi = oinfo;
617         fa->fa_upcall = upcall;
618         fa->fa_cookie = cookie;
619
620         if (rqset == PTLRPCD_SET)
621                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
622         else
623                 ptlrpc_set_add_req(rqset, req);
624
625         RETURN (0);
626 }
627
628 /* Find and cancel locally locks matched by @mode in the resource found by
629  * @objid. Found locks are added into @cancel list. Returns the amount of
630  * locks added to @cancels list. */
631 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
632                                    struct list_head *cancels,
633                                    ldlm_mode_t mode, __u64 lock_flags)
634 {
635         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
636         struct ldlm_res_id res_id;
637         struct ldlm_resource *res;
638         int count;
639         ENTRY;
640
641         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
642          * export) but disabled through procfs (flag in NS).
643          *
644          * This distinguishes from a case when ELC is not supported originally,
645          * when we still want to cancel locks in advance and just cancel them
646          * locally, without sending any RPC. */
647         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
648                 RETURN(0);
649
650         ostid_build_res_name(&oa->o_oi, &res_id);
651         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
652         if (IS_ERR(res))
653                 RETURN(0);
654
655         LDLM_RESOURCE_ADDREF(res);
656         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
657                                            lock_flags, 0, NULL);
658         LDLM_RESOURCE_DELREF(res);
659         ldlm_resource_putref(res);
660         RETURN(count);
661 }
662
663 static int osc_destroy_interpret(const struct lu_env *env,
664                                  struct ptlrpc_request *req, void *data,
665                                  int rc)
666 {
667         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
668
669         atomic_dec(&cli->cl_destroy_in_flight);
670         wake_up(&cli->cl_destroy_waitq);
671         return 0;
672 }
673
674 static int osc_can_send_destroy(struct client_obd *cli)
675 {
676         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
677             cli->cl_max_rpcs_in_flight) {
678                 /* The destroy request can be sent */
679                 return 1;
680         }
681         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
682             cli->cl_max_rpcs_in_flight) {
683                 /*
684                  * The counter has been modified between the two atomic
685                  * operations.
686                  */
687                 wake_up(&cli->cl_destroy_waitq);
688         }
689         return 0;
690 }
691
692 int osc_create(const struct lu_env *env, struct obd_export *exp,
693                struct obdo *oa, struct lov_stripe_md **ea,
694                struct obd_trans_info *oti)
695 {
696         int rc = 0;
697         ENTRY;
698
699         LASSERT(oa);
700         LASSERT(ea);
701         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
702
703         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
704             oa->o_flags == OBD_FL_RECREATE_OBJS) {
705                 RETURN(osc_real_create(exp, oa, ea, oti));
706         }
707
708         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
709                 RETURN(osc_real_create(exp, oa, ea, oti));
710
711         /* we should not get here anymore */
712         LBUG();
713
714         RETURN(rc);
715 }
716
717 /* Destroy requests can be async always on the client, and we don't even really
718  * care about the return code since the client cannot do anything at all about
719  * a destroy failure.
720  * When the MDS is unlinking a filename, it saves the file objects into a
721  * recovery llog, and these object records are cancelled when the OST reports
722  * they were destroyed and sync'd to disk (i.e. transaction committed).
723  * If the client dies, or the OST is down when the object should be destroyed,
724  * the records are not cancelled, and when the OST reconnects to the MDS next,
725  * it will retrieve the llog unlink logs and then sends the log cancellation
726  * cookies to the MDS after committing destroy transactions. */
727 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
728                        struct obdo *oa, struct lov_stripe_md *ea,
729                        struct obd_trans_info *oti, struct obd_export *md_export,
730                        void *capa)
731 {
732         struct client_obd     *cli = &exp->exp_obd->u.cli;
733         struct ptlrpc_request *req;
734         struct ost_body       *body;
735         struct list_head       cancels = LIST_HEAD_INIT(cancels);
736         int rc, count;
737         ENTRY;
738
739         if (!oa) {
740                 CDEBUG(D_INFO, "oa NULL\n");
741                 RETURN(-EINVAL);
742         }
743
744         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
745                                         LDLM_FL_DISCARD_DATA);
746
747         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
748         if (req == NULL) {
749                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
750                 RETURN(-ENOMEM);
751         }
752
753         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
754         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
755                                0, &cancels, count);
756         if (rc) {
757                 ptlrpc_request_free(req);
758                 RETURN(rc);
759         }
760
761         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
762         ptlrpc_at_set_req_timeout(req);
763
764         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
765                 oa->o_lcookie = *oti->oti_logcookies;
766         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
767         LASSERT(body);
768         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
769
770         osc_pack_capa(req, body, (struct obd_capa *)capa);
771         ptlrpc_request_set_replen(req);
772
773         /* If osc_destory is for destroying the unlink orphan,
774          * sent from MDT to OST, which should not be blocked here,
775          * because the process might be triggered by ptlrpcd, and
776          * it is not good to block ptlrpcd thread (b=16006)*/
777         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
778                 req->rq_interpret_reply = osc_destroy_interpret;
779                 if (!osc_can_send_destroy(cli)) {
780                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
781                                                           NULL);
782
783                         /*
784                          * Wait until the number of on-going destroy RPCs drops
785                          * under max_rpc_in_flight
786                          */
787                         l_wait_event_exclusive(cli->cl_destroy_waitq,
788                                                osc_can_send_destroy(cli), &lwi);
789                 }
790         }
791
792         /* Do not wait for response */
793         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
794         RETURN(0);
795 }
796
797 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
798                                 long writing_bytes)
799 {
800         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
801
802         LASSERT(!(oa->o_valid & bits));
803
804         oa->o_valid |= bits;
805         client_obd_list_lock(&cli->cl_loi_list_lock);
806         oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
807         if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
808                      cli->cl_dirty_max_pages)) {
809                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
810                        cli->cl_dirty_pages, cli->cl_dirty_transit,
811                        cli->cl_dirty_max_pages);
812                 oa->o_undirty = 0;
813         } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
814                             atomic_long_read(&obd_dirty_transit_pages) >
815                             (obd_max_dirty_pages + 1))) {
816                 /* The atomic_read() allowing the atomic_inc() are
817                  * not covered by a lock thus they may safely race and trip
818                  * this CERROR() unless we add in a small fudge factor (+1). */
819                 CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
820                        cli->cl_import->imp_obd->obd_name,
821                        atomic_long_read(&obd_dirty_pages),
822                        atomic_long_read(&obd_dirty_transit_pages),
823                        obd_max_dirty_pages);
824                 oa->o_undirty = 0;
825         } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
826                             0x7fffffff)) {
827                 CERROR("dirty %lu - dirty_max %lu too big???\n",
828                        cli->cl_dirty_pages, cli->cl_dirty_max_pages);
829                 oa->o_undirty = 0;
830         } else {
831                 unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
832                                       PAGE_CACHE_SHIFT) *
833                                      (cli->cl_max_rpcs_in_flight + 1);
834                 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
835                                     max_in_flight);
836         }
837         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
838         oa->o_dropped = cli->cl_lost_grant;
839         cli->cl_lost_grant = 0;
840         client_obd_list_unlock(&cli->cl_loi_list_lock);
841         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
842                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
843
844 }
845
846 void osc_update_next_shrink(struct client_obd *cli)
847 {
848         cli->cl_next_shrink_grant =
849                 cfs_time_shift(cli->cl_grant_shrink_interval);
850         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
851                cli->cl_next_shrink_grant);
852 }
853
854 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
855 {
856         client_obd_list_lock(&cli->cl_loi_list_lock);
857         cli->cl_avail_grant += grant;
858         client_obd_list_unlock(&cli->cl_loi_list_lock);
859 }
860
861 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
862 {
863         if (body->oa.o_valid & OBD_MD_FLGRANT) {
864                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
865                 __osc_update_grant(cli, body->oa.o_grant);
866         }
867 }
868
869 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
870                               obd_count keylen, void *key, obd_count vallen,
871                               void *val, struct ptlrpc_request_set *set);
872
873 static int osc_shrink_grant_interpret(const struct lu_env *env,
874                                       struct ptlrpc_request *req,
875                                       void *aa, int rc)
876 {
877         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
878         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
879         struct ost_body *body;
880
881         if (rc != 0) {
882                 __osc_update_grant(cli, oa->o_grant);
883                 GOTO(out, rc);
884         }
885
886         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
887         LASSERT(body);
888         osc_update_grant(cli, body);
889 out:
890         OBDO_FREE(oa);
891         return rc;
892 }
893
894 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
895 {
896         client_obd_list_lock(&cli->cl_loi_list_lock);
897         oa->o_grant = cli->cl_avail_grant / 4;
898         cli->cl_avail_grant -= oa->o_grant;
899         client_obd_list_unlock(&cli->cl_loi_list_lock);
900         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
901                 oa->o_valid |= OBD_MD_FLFLAGS;
902                 oa->o_flags = 0;
903         }
904         oa->o_flags |= OBD_FL_SHRINK_GRANT;
905         osc_update_next_shrink(cli);
906 }
907
908 /* Shrink the current grant, either from some large amount to enough for a
909  * full set of in-flight RPCs, or if we have already shrunk to that limit
910  * then to enough for a single RPC.  This avoids keeping more grant than
911  * needed, and avoids shrinking the grant piecemeal. */
912 static int osc_shrink_grant(struct client_obd *cli)
913 {
914         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
915                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
916
917         client_obd_list_lock(&cli->cl_loi_list_lock);
918         if (cli->cl_avail_grant <= target_bytes)
919                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
920         client_obd_list_unlock(&cli->cl_loi_list_lock);
921
922         return osc_shrink_grant_to_target(cli, target_bytes);
923 }
924
925 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
926 {
927         int                     rc = 0;
928         struct ost_body        *body;
929         ENTRY;
930
931         client_obd_list_lock(&cli->cl_loi_list_lock);
932         /* Don't shrink if we are already above or below the desired limit
933          * We don't want to shrink below a single RPC, as that will negatively
934          * impact block allocation and long-term performance. */
935         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
936                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
937
938         if (target_bytes >= cli->cl_avail_grant) {
939                 client_obd_list_unlock(&cli->cl_loi_list_lock);
940                 RETURN(0);
941         }
942         client_obd_list_unlock(&cli->cl_loi_list_lock);
943
944         OBD_ALLOC_PTR(body);
945         if (!body)
946                 RETURN(-ENOMEM);
947
948         osc_announce_cached(cli, &body->oa, 0);
949
950         client_obd_list_lock(&cli->cl_loi_list_lock);
951         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
952         cli->cl_avail_grant = target_bytes;
953         client_obd_list_unlock(&cli->cl_loi_list_lock);
954         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
955                 body->oa.o_valid |= OBD_MD_FLFLAGS;
956                 body->oa.o_flags = 0;
957         }
958         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
959         osc_update_next_shrink(cli);
960
961         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
962                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
963                                 sizeof(*body), body, NULL);
964         if (rc != 0)
965                 __osc_update_grant(cli, body->oa.o_grant);
966         OBD_FREE_PTR(body);
967         RETURN(rc);
968 }
969
970 static int osc_should_shrink_grant(struct client_obd *client)
971 {
972         cfs_time_t time = cfs_time_current();
973         cfs_time_t next_shrink = client->cl_next_shrink_grant;
974
975         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
976              OBD_CONNECT_GRANT_SHRINK) == 0)
977                 return 0;
978
979         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
980                 /* Get the current RPC size directly, instead of going via:
981                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
982                  * Keep comment here so that it can be found by searching. */
983                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
984
985                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
986                     client->cl_avail_grant > brw_size)
987                         return 1;
988                 else
989                         osc_update_next_shrink(client);
990         }
991         return 0;
992 }
993
994 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
995 {
996         struct client_obd *client;
997
998         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
999                 if (osc_should_shrink_grant(client))
1000                         osc_shrink_grant(client);
1001         }
1002         return 0;
1003 }
1004
1005 static int osc_add_shrink_grant(struct client_obd *client)
1006 {
1007         int rc;
1008
1009         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1010                                        TIMEOUT_GRANT,
1011                                        osc_grant_shrink_grant_cb, NULL,
1012                                        &client->cl_grant_shrink_list);
1013         if (rc) {
1014                 CERROR("add grant client %s error %d\n",
1015                         client->cl_import->imp_obd->obd_name, rc);
1016                 return rc;
1017         }
1018         CDEBUG(D_CACHE, "add grant client %s \n",
1019                client->cl_import->imp_obd->obd_name);
1020         osc_update_next_shrink(client);
1021         return 0;
1022 }
1023
1024 static int osc_del_shrink_grant(struct client_obd *client)
1025 {
1026         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1027                                          TIMEOUT_GRANT);
1028 }
1029
1030 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1031 {
1032         /*
1033          * ocd_grant is the total grant amount we're expect to hold: if we've
1034          * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1035          * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1036          * dirty.
1037          *
1038          * race is tolerable here: if we're evicted, but imp_state already
1039          * left EVICTED state, then cl_dirty_pages must be 0 already.
1040          */
1041         client_obd_list_lock(&cli->cl_loi_list_lock);
1042         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1043                 cli->cl_avail_grant = ocd->ocd_grant;
1044         else
1045                 cli->cl_avail_grant = ocd->ocd_grant -
1046                                       (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1047
1048         if (cli->cl_avail_grant < 0) {
1049                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1050                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1051                       ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
1052                 /* workaround for servers which do not have the patch from
1053                  * LU-2679 */
1054                 cli->cl_avail_grant = ocd->ocd_grant;
1055         }
1056
1057         /* determine the appropriate chunk size used by osc_extent. */
1058         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1059         client_obd_list_unlock(&cli->cl_loi_list_lock);
1060
1061         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1062                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1063                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1064
1065         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1066             list_empty(&cli->cl_grant_shrink_list))
1067                 osc_add_shrink_grant(cli);
1068 }
1069
1070 /* We assume that the reason this OSC got a short read is because it read
1071  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1072  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1073  * this stripe never got written at or beyond this stripe offset yet. */
1074 static void handle_short_read(int nob_read, obd_count page_count,
1075                               struct brw_page **pga)
1076 {
1077         char *ptr;
1078         int i = 0;
1079
1080         /* skip bytes read OK */
1081         while (nob_read > 0) {
1082                 LASSERT (page_count > 0);
1083
1084                 if (pga[i]->count > nob_read) {
1085                         /* EOF inside this page */
1086                         ptr = kmap(pga[i]->pg) +
1087                                 (pga[i]->off & ~CFS_PAGE_MASK);
1088                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1089                         kunmap(pga[i]->pg);
1090                         page_count--;
1091                         i++;
1092                         break;
1093                 }
1094
1095                 nob_read -= pga[i]->count;
1096                 page_count--;
1097                 i++;
1098         }
1099
1100         /* zero remaining pages */
1101         while (page_count-- > 0) {
1102                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1103                 memset(ptr, 0, pga[i]->count);
1104                 kunmap(pga[i]->pg);
1105                 i++;
1106         }
1107 }
1108
1109 static int check_write_rcs(struct ptlrpc_request *req,
1110                            int requested_nob, int niocount,
1111                            obd_count page_count, struct brw_page **pga)
1112 {
1113         int     i;
1114         __u32   *remote_rcs;
1115
1116         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1117                                                   sizeof(*remote_rcs) *
1118                                                   niocount);
1119         if (remote_rcs == NULL) {
1120                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1121                 return(-EPROTO);
1122         }
1123
1124         /* return error if any niobuf was in error */
1125         for (i = 0; i < niocount; i++) {
1126                 if ((int)remote_rcs[i] < 0)
1127                         return(remote_rcs[i]);
1128
1129                 if (remote_rcs[i] != 0) {
1130                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1131                                 i, remote_rcs[i], req);
1132                         return(-EPROTO);
1133                 }
1134         }
1135
1136         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1137                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1138                        req->rq_bulk->bd_nob_transferred, requested_nob);
1139                 return(-EPROTO);
1140         }
1141
1142         return (0);
1143 }
1144
1145 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1146 {
1147         if (p1->flag != p2->flag) {
1148                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1149                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1150                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1151
1152                 /* warn if we try to combine flags that we don't know to be
1153                  * safe to combine */
1154                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1155                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1156                               "report this at https://jira.hpdd.intel.com/\n",
1157                               p1->flag, p2->flag);
1158                 }
1159                 return 0;
1160         }
1161
1162         return (p1->off + p1->count == p2->off);
1163 }
1164
1165 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1166                                    struct brw_page **pga, int opc,
1167                                    cksum_type_t cksum_type)
1168 {
1169         __u32                           cksum;
1170         int                             i = 0;
1171         struct cfs_crypto_hash_desc     *hdesc;
1172         unsigned int                    bufsize;
1173         int                             err;
1174         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1175
1176         LASSERT(pg_count > 0);
1177
1178         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1179         if (IS_ERR(hdesc)) {
1180                 CERROR("Unable to initialize checksum hash %s\n",
1181                        cfs_crypto_hash_name(cfs_alg));
1182                 return PTR_ERR(hdesc);
1183         }
1184
1185         while (nob > 0 && pg_count > 0) {
1186                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1187
1188                 /* corrupt the data before we compute the checksum, to
1189                  * simulate an OST->client data error */
1190                 if (i == 0 && opc == OST_READ &&
1191                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1192                         unsigned char *ptr = kmap(pga[i]->pg);
1193                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1194
1195                         memcpy(ptr + off, "bad1", min(4, nob));
1196                         kunmap(pga[i]->pg);
1197                 }
1198                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1199                                             pga[i]->off & ~CFS_PAGE_MASK,
1200                                             count);
1201                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1202                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1203
1204                 nob -= pga[i]->count;
1205                 pg_count--;
1206                 i++;
1207         }
1208
1209         bufsize = sizeof(cksum);
1210         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1211
1212         /* For sending we only compute the wrong checksum instead
1213          * of corrupting the data so it is still correct on a redo */
1214         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1215                 cksum++;
1216
1217         return cksum;
1218 }
1219
1220 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1221                                 struct lov_stripe_md *lsm, obd_count page_count,
1222                                 struct brw_page **pga,
1223                                 struct ptlrpc_request **reqp,
1224                                 struct obd_capa *ocapa, int reserve,
1225                                 int resend)
1226 {
1227         struct ptlrpc_request   *req;
1228         struct ptlrpc_bulk_desc *desc;
1229         struct ost_body         *body;
1230         struct obd_ioobj        *ioobj;
1231         struct niobuf_remote    *niobuf;
1232         int niocount, i, requested_nob, opc, rc;
1233         struct osc_brw_async_args *aa;
1234         struct req_capsule      *pill;
1235         struct brw_page *pg_prev;
1236
1237         ENTRY;
1238         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1239                 RETURN(-ENOMEM); /* Recoverable */
1240         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1241                 RETURN(-EINVAL); /* Fatal */
1242
1243         if ((cmd & OBD_BRW_WRITE) != 0) {
1244                 opc = OST_WRITE;
1245                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1246                                                 cli->cl_import->imp_rq_pool,
1247                                                 &RQF_OST_BRW_WRITE);
1248         } else {
1249                 opc = OST_READ;
1250                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1251         }
1252         if (req == NULL)
1253                 RETURN(-ENOMEM);
1254
1255         for (niocount = i = 1; i < page_count; i++) {
1256                 if (!can_merge_pages(pga[i - 1], pga[i]))
1257                         niocount++;
1258         }
1259
1260         pill = &req->rq_pill;
1261         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1262                              sizeof(*ioobj));
1263         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1264                              niocount * sizeof(*niobuf));
1265         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1266
1267         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1268         if (rc) {
1269                 ptlrpc_request_free(req);
1270                 RETURN(rc);
1271         }
1272         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1273         ptlrpc_at_set_req_timeout(req);
1274         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1275          * retry logic */
1276         req->rq_no_retry_einprogress = 1;
1277
1278         desc = ptlrpc_prep_bulk_imp(req, page_count,
1279                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1280                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1281                 OST_BULK_PORTAL);
1282
1283         if (desc == NULL)
1284                 GOTO(out, rc = -ENOMEM);
1285         /* NB request now owns desc and will free it when it gets freed */
1286
1287         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1288         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1289         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1290         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1291
1292         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1293
1294         obdo_to_ioobj(oa, ioobj);
1295         ioobj->ioo_bufcnt = niocount;
1296         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1297          * that might be send for this request.  The actual number is decided
1298          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1299          * "max - 1" for old client compatibility sending "0", and also so the
1300          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1301         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1302         osc_pack_capa(req, body, ocapa);
1303         LASSERT(page_count > 0);
1304         pg_prev = pga[0];
1305         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1306                 struct brw_page *pg = pga[i];
1307                 int poff = pg->off & ~CFS_PAGE_MASK;
1308
1309                 LASSERT(pg->count > 0);
1310                 /* make sure there is no gap in the middle of page array */
1311                 LASSERTF(page_count == 1 ||
1312                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1313                           ergo(i > 0 && i < page_count - 1,
1314                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1315                           ergo(i == page_count - 1, poff == 0)),
1316                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1317                          i, page_count, pg, pg->off, pg->count);
1318                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1319                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1320                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1321                          i, page_count,
1322                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1323                          pg_prev->pg, page_private(pg_prev->pg),
1324                          pg_prev->pg->index, pg_prev->off);
1325                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1326                         (pg->flag & OBD_BRW_SRVLOCK));
1327
1328                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1329                 requested_nob += pg->count;
1330
1331                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1332                         niobuf--;
1333                         niobuf->rnb_len += pg->count;
1334                 } else {
1335                         niobuf->rnb_offset = pg->off;
1336                         niobuf->rnb_len    = pg->count;
1337                         niobuf->rnb_flags  = pg->flag;
1338                 }
1339                 pg_prev = pg;
1340         }
1341
1342         LASSERTF((void *)(niobuf - niocount) ==
1343                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1344                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1345                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1346
1347         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1348         if (resend) {
1349                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1350                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1351                         body->oa.o_flags = 0;
1352                 }
1353                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1354         }
1355
1356         if (osc_should_shrink_grant(cli))
1357                 osc_shrink_grant_local(cli, &body->oa);
1358
1359         /* size[REQ_REC_OFF] still sizeof (*body) */
1360         if (opc == OST_WRITE) {
1361                 if (cli->cl_checksum &&
1362                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1363                         /* store cl_cksum_type in a local variable since
1364                          * it can be changed via lprocfs */
1365                         cksum_type_t cksum_type = cli->cl_cksum_type;
1366
1367                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1368                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1369                                 body->oa.o_flags = 0;
1370                         }
1371                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1372                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1373                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1374                                                              page_count, pga,
1375                                                              OST_WRITE,
1376                                                              cksum_type);
1377                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1378                                body->oa.o_cksum);
1379                         /* save this in 'oa', too, for later checking */
1380                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1381                         oa->o_flags |= cksum_type_pack(cksum_type);
1382                 } else {
1383                         /* clear out the checksum flag, in case this is a
1384                          * resend but cl_checksum is no longer set. b=11238 */
1385                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1386                 }
1387                 oa->o_cksum = body->oa.o_cksum;
1388                 /* 1 RC per niobuf */
1389                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1390                                      sizeof(__u32) * niocount);
1391         } else {
1392                 if (cli->cl_checksum &&
1393                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1394                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1395                                 body->oa.o_flags = 0;
1396                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1397                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1398                 }
1399         }
1400         ptlrpc_request_set_replen(req);
1401
1402         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1403         aa = ptlrpc_req_async_args(req);
1404         aa->aa_oa = oa;
1405         aa->aa_requested_nob = requested_nob;
1406         aa->aa_nio_count = niocount;
1407         aa->aa_page_count = page_count;
1408         aa->aa_resends = 0;
1409         aa->aa_ppga = pga;
1410         aa->aa_cli = cli;
1411         INIT_LIST_HEAD(&aa->aa_oaps);
1412         if (ocapa && reserve)
1413                 aa->aa_ocapa = capa_get(ocapa);
1414
1415         *reqp = req;
1416         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1417         CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1418                 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1419                 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1420         RETURN(0);
1421
1422  out:
1423         ptlrpc_req_finished(req);
1424         RETURN(rc);
1425 }
1426
1427 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1428                                 __u32 client_cksum, __u32 server_cksum, int nob,
1429                                 obd_count page_count, struct brw_page **pga,
1430                                 cksum_type_t client_cksum_type)
1431 {
1432         __u32 new_cksum;
1433         char *msg;
1434         cksum_type_t cksum_type;
1435
1436         if (server_cksum == client_cksum) {
1437                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1438                 return 0;
1439         }
1440
1441         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1442                                        oa->o_flags : 0);
1443         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1444                                       cksum_type);
1445
1446         if (cksum_type != client_cksum_type)
1447                 msg = "the server did not use the checksum type specified in "
1448                       "the original request - likely a protocol problem";
1449         else if (new_cksum == server_cksum)
1450                 msg = "changed on the client after we checksummed it - "
1451                       "likely false positive due to mmap IO (bug 11742)";
1452         else if (new_cksum == client_cksum)
1453                 msg = "changed in transit before arrival at OST";
1454         else
1455                 msg = "changed in transit AND doesn't match the original - "
1456                       "likely false positive due to mmap IO (bug 11742)";
1457
1458         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1459                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1460                            msg, libcfs_nid2str(peer->nid),
1461                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1462                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1463                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1464                            POSTID(&oa->o_oi), pga[0]->off,
1465                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1466         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1467                "client csum now %x\n", client_cksum, client_cksum_type,
1468                server_cksum, cksum_type, new_cksum);
1469         return 1;
1470 }
1471
1472 /* Note rc enters this function as number of bytes transferred */
1473 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1474 {
1475         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1476         const lnet_process_id_t *peer =
1477                         &req->rq_import->imp_connection->c_peer;
1478         struct client_obd *cli = aa->aa_cli;
1479         struct ost_body *body;
1480         __u32 client_cksum = 0;
1481         ENTRY;
1482
1483         if (rc < 0 && rc != -EDQUOT) {
1484                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1485                 RETURN(rc);
1486         }
1487
1488         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1489         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1490         if (body == NULL) {
1491                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1492                 RETURN(-EPROTO);
1493         }
1494
1495         /* set/clear over quota flag for a uid/gid */
1496         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1497             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1498                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1499
1500                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1501                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1502                        body->oa.o_flags);
1503                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1504         }
1505
1506         osc_update_grant(cli, body);
1507
1508         if (rc < 0)
1509                 RETURN(rc);
1510
1511         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1512                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1513
1514         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1515                 if (rc > 0) {
1516                         CERROR("Unexpected +ve rc %d\n", rc);
1517                         RETURN(-EPROTO);
1518                 }
1519                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1520
1521                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1522                         RETURN(-EAGAIN);
1523
1524                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1525                     check_write_checksum(&body->oa, peer, client_cksum,
1526                                          body->oa.o_cksum, aa->aa_requested_nob,
1527                                          aa->aa_page_count, aa->aa_ppga,
1528                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1529                         RETURN(-EAGAIN);
1530
1531                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1532                                      aa->aa_page_count, aa->aa_ppga);
1533                 GOTO(out, rc);
1534         }
1535
1536         /* The rest of this function executes only for OST_READs */
1537
1538         /* if unwrap_bulk failed, return -EAGAIN to retry */
1539         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1540         if (rc < 0)
1541                 GOTO(out, rc = -EAGAIN);
1542
1543         if (rc > aa->aa_requested_nob) {
1544                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1545                        aa->aa_requested_nob);
1546                 RETURN(-EPROTO);
1547         }
1548
1549         if (rc != req->rq_bulk->bd_nob_transferred) {
1550                 CERROR ("Unexpected rc %d (%d transferred)\n",
1551                         rc, req->rq_bulk->bd_nob_transferred);
1552                 return (-EPROTO);
1553         }
1554
1555         if (rc < aa->aa_requested_nob)
1556                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1557
1558         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1559                 static int cksum_counter;
1560                 __u32      server_cksum = body->oa.o_cksum;
1561                 char      *via;
1562                 char      *router;
1563                 cksum_type_t cksum_type;
1564
1565                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1566                                                body->oa.o_flags : 0);
1567                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1568                                                  aa->aa_ppga, OST_READ,
1569                                                  cksum_type);
1570
1571                 if (peer->nid == req->rq_bulk->bd_sender) {
1572                         via = router = "";
1573                 } else {
1574                         via = " via ";
1575                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1576                 }
1577
1578                 if (server_cksum != client_cksum) {
1579                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1580                                            "%s%s%s inode "DFID" object "DOSTID
1581                                            " extent ["LPU64"-"LPU64"]\n",
1582                                            req->rq_import->imp_obd->obd_name,
1583                                            libcfs_nid2str(peer->nid),
1584                                            via, router,
1585                                            body->oa.o_valid & OBD_MD_FLFID ?
1586                                                 body->oa.o_parent_seq : (__u64)0,
1587                                            body->oa.o_valid & OBD_MD_FLFID ?
1588                                                 body->oa.o_parent_oid : 0,
1589                                            body->oa.o_valid & OBD_MD_FLFID ?
1590                                                 body->oa.o_parent_ver : 0,
1591                                            POSTID(&body->oa.o_oi),
1592                                            aa->aa_ppga[0]->off,
1593                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1594                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1595                                                                         1);
1596                         CERROR("client %x, server %x, cksum_type %x\n",
1597                                client_cksum, server_cksum, cksum_type);
1598                         cksum_counter = 0;
1599                         aa->aa_oa->o_cksum = client_cksum;
1600                         rc = -EAGAIN;
1601                 } else {
1602                         cksum_counter++;
1603                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1604                         rc = 0;
1605                 }
1606         } else if (unlikely(client_cksum)) {
1607                 static int cksum_missed;
1608
1609                 cksum_missed++;
1610                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1611                         CERROR("Checksum %u requested from %s but not sent\n",
1612                                cksum_missed, libcfs_nid2str(peer->nid));
1613         } else {
1614                 rc = 0;
1615         }
1616 out:
1617         if (rc >= 0)
1618                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1619                                      aa->aa_oa, &body->oa);
1620
1621         RETURN(rc);
1622 }
1623
1624 static int osc_brw_redo_request(struct ptlrpc_request *request,
1625                                 struct osc_brw_async_args *aa, int rc)
1626 {
1627         struct ptlrpc_request *new_req;
1628         struct osc_brw_async_args *new_aa;
1629         struct osc_async_page *oap;
1630         ENTRY;
1631
1632         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1633                   "redo for recoverable error %d", rc);
1634
1635         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1636                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1637                                   aa->aa_cli, aa->aa_oa,
1638                                   NULL /* lsm unused by osc currently */,
1639                                   aa->aa_page_count, aa->aa_ppga,
1640                                   &new_req, aa->aa_ocapa, 0, 1);
1641         if (rc)
1642                 RETURN(rc);
1643
1644         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1645                 if (oap->oap_request != NULL) {
1646                         LASSERTF(request == oap->oap_request,
1647                                  "request %p != oap_request %p\n",
1648                                  request, oap->oap_request);
1649                         if (oap->oap_interrupted) {
1650                                 ptlrpc_req_finished(new_req);
1651                                 RETURN(-EINTR);
1652                         }
1653                 }
1654         }
1655         /* New request takes over pga and oaps from old request.
1656          * Note that copying a list_head doesn't work, need to move it... */
1657         aa->aa_resends++;
1658         new_req->rq_interpret_reply = request->rq_interpret_reply;
1659         new_req->rq_async_args = request->rq_async_args;
1660         new_req->rq_commit_cb = request->rq_commit_cb;
1661         /* cap resend delay to the current request timeout, this is similar to
1662          * what ptlrpc does (see after_reply()) */
1663         if (aa->aa_resends > new_req->rq_timeout)
1664                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1665         else
1666                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1667         new_req->rq_generation_set = 1;
1668         new_req->rq_import_generation = request->rq_import_generation;
1669
1670         new_aa = ptlrpc_req_async_args(new_req);
1671
1672         INIT_LIST_HEAD(&new_aa->aa_oaps);
1673         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1674         INIT_LIST_HEAD(&new_aa->aa_exts);
1675         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1676         new_aa->aa_resends = aa->aa_resends;
1677
1678         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1679                 if (oap->oap_request) {
1680                         ptlrpc_req_finished(oap->oap_request);
1681                         oap->oap_request = ptlrpc_request_addref(new_req);
1682                 }
1683         }
1684
1685         new_aa->aa_ocapa = aa->aa_ocapa;
1686         aa->aa_ocapa = NULL;
1687
1688         /* XXX: This code will run into problem if we're going to support
1689          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1690          * and wait for all of them to be finished. We should inherit request
1691          * set from old request. */
1692         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1693
1694         DEBUG_REQ(D_INFO, new_req, "new request");
1695         RETURN(0);
1696 }
1697
1698 /*
1699  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1700  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1701  * fine for our small page arrays and doesn't require allocation.  its an
1702  * insertion sort that swaps elements that are strides apart, shrinking the
1703  * stride down until its '1' and the array is sorted.
1704  */
1705 static void sort_brw_pages(struct brw_page **array, int num)
1706 {
1707         int stride, i, j;
1708         struct brw_page *tmp;
1709
1710         if (num == 1)
1711                 return;
1712         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1713                 ;
1714
1715         do {
1716                 stride /= 3;
1717                 for (i = stride ; i < num ; i++) {
1718                         tmp = array[i];
1719                         j = i;
1720                         while (j >= stride && array[j - stride]->off > tmp->off) {
1721                                 array[j] = array[j - stride];
1722                                 j -= stride;
1723                         }
1724                         array[j] = tmp;
1725                 }
1726         } while (stride > 1);
1727 }
1728
1729 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1730 {
1731         LASSERT(ppga != NULL);
1732         OBD_FREE(ppga, sizeof(*ppga) * count);
1733 }
1734
1735 static int brw_interpret(const struct lu_env *env,
1736                          struct ptlrpc_request *req, void *data, int rc)
1737 {
1738         struct osc_brw_async_args *aa = data;
1739         struct osc_extent *ext;
1740         struct osc_extent *tmp;
1741         struct client_obd *cli = aa->aa_cli;
1742         ENTRY;
1743
1744         rc = osc_brw_fini_request(req, rc);
1745         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1746         /* When server return -EINPROGRESS, client should always retry
1747          * regardless of the number of times the bulk was resent already. */
1748         if (osc_recoverable_error(rc)) {
1749                 if (req->rq_import_generation !=
1750                     req->rq_import->imp_generation) {
1751                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1752                                ""DOSTID", rc = %d.\n",
1753                                req->rq_import->imp_obd->obd_name,
1754                                POSTID(&aa->aa_oa->o_oi), rc);
1755                 } else if (rc == -EINPROGRESS ||
1756                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1757                         rc = osc_brw_redo_request(req, aa, rc);
1758                 } else {
1759                         CERROR("%s: too many resent retries for object: "
1760                                ""LPU64":"LPU64", rc = %d.\n",
1761                                req->rq_import->imp_obd->obd_name,
1762                                POSTID(&aa->aa_oa->o_oi), rc);
1763                 }
1764
1765                 if (rc == 0)
1766                         RETURN(0);
1767                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1768                         rc = -EIO;
1769         }
1770
1771         if (aa->aa_ocapa) {
1772                 capa_put(aa->aa_ocapa);
1773                 aa->aa_ocapa = NULL;
1774         }
1775
1776         if (rc == 0) {
1777                 struct obdo *oa = aa->aa_oa;
1778                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1779                 unsigned long valid = 0;
1780                 struct cl_object *obj;
1781                 struct osc_async_page *last;
1782
1783                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1784                 obj = osc2cl(last->oap_obj);
1785
1786                 cl_object_attr_lock(obj);
1787                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1788                         attr->cat_blocks = oa->o_blocks;
1789                         valid |= CAT_BLOCKS;
1790                 }
1791                 if (oa->o_valid & OBD_MD_FLMTIME) {
1792                         attr->cat_mtime = oa->o_mtime;
1793                         valid |= CAT_MTIME;
1794                 }
1795                 if (oa->o_valid & OBD_MD_FLATIME) {
1796                         attr->cat_atime = oa->o_atime;
1797                         valid |= CAT_ATIME;
1798                 }
1799                 if (oa->o_valid & OBD_MD_FLCTIME) {
1800                         attr->cat_ctime = oa->o_ctime;
1801                         valid |= CAT_CTIME;
1802                 }
1803
1804                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1805                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1806                         loff_t last_off = last->oap_count + last->oap_obj_off +
1807                                 last->oap_page_off;
1808
1809                         /* Change file size if this is an out of quota or
1810                          * direct IO write and it extends the file size */
1811                         if (loi->loi_lvb.lvb_size < last_off) {
1812                                 attr->cat_size = last_off;
1813                                 valid |= CAT_SIZE;
1814                         }
1815                         /* Extend KMS if it's not a lockless write */
1816                         if (loi->loi_kms < last_off &&
1817                             oap2osc_page(last)->ops_srvlock == 0) {
1818                                 attr->cat_kms = last_off;
1819                                 valid |= CAT_KMS;
1820                         }
1821                 }
1822
1823                 if (valid != 0)
1824                         cl_object_attr_set(env, obj, attr, valid);
1825                 cl_object_attr_unlock(obj);
1826         }
1827         OBDO_FREE(aa->aa_oa);
1828
1829         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1830                 osc_inc_unstable_pages(req);
1831
1832         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1833                 list_del_init(&ext->oe_link);
1834                 osc_extent_finish(env, ext, 1, rc);
1835         }
1836         LASSERT(list_empty(&aa->aa_exts));
1837         LASSERT(list_empty(&aa->aa_oaps));
1838
1839         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1840                           req->rq_bulk->bd_nob_transferred);
1841         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1842         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1843
1844         client_obd_list_lock(&cli->cl_loi_list_lock);
1845         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1846          * is called so we know whether to go to sync BRWs or wait for more
1847          * RPCs to complete */
1848         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1849                 cli->cl_w_in_flight--;
1850         else
1851                 cli->cl_r_in_flight--;
1852         osc_wake_cache_waiters(cli);
1853         client_obd_list_unlock(&cli->cl_loi_list_lock);
1854
1855         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1856         RETURN(rc);
1857 }
1858
1859 static void brw_commit(struct ptlrpc_request *req)
1860 {
1861         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
1862          * this called via the rq_commit_cb, I need to ensure
1863          * osc_dec_unstable_pages is still called. Otherwise unstable
1864          * pages may be leaked. */
1865         spin_lock(&req->rq_lock);
1866         if (likely(req->rq_unstable)) {
1867                 req->rq_unstable = 0;
1868                 spin_unlock(&req->rq_lock);
1869
1870                 osc_dec_unstable_pages(req);
1871         } else {
1872                 req->rq_committed = 1;
1873                 spin_unlock(&req->rq_lock);
1874         }
1875 }
1876
1877 /**
1878  * Build an RPC by the list of extent @ext_list. The caller must ensure
1879  * that the total pages in this list are NOT over max pages per RPC.
1880  * Extents in the list must be in OES_RPC state.
1881  */
1882 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1883                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1884 {
1885         struct ptlrpc_request           *req = NULL;
1886         struct osc_extent               *ext;
1887         struct brw_page                 **pga = NULL;
1888         struct osc_brw_async_args       *aa = NULL;
1889         struct obdo                     *oa = NULL;
1890         struct osc_async_page           *oap;
1891         struct osc_async_page           *tmp;
1892         struct cl_req                   *clerq = NULL;
1893         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1894                                                                       CRT_READ;
1895         struct cl_req_attr              *crattr = NULL;
1896         obd_off                         starting_offset = OBD_OBJECT_EOF;
1897         obd_off                         ending_offset = 0;
1898         int                             mpflag = 0;
1899         int                             mem_tight = 0;
1900         int                             page_count = 0;
1901         bool                            soft_sync = false;
1902         int                             i;
1903         int                             rc;
1904         struct list_head                rpc_list = LIST_HEAD_INIT(rpc_list);
1905
1906         ENTRY;
1907         LASSERT(!list_empty(ext_list));
1908
1909         /* add pages into rpc_list to build BRW rpc */
1910         list_for_each_entry(ext, ext_list, oe_link) {
1911                 LASSERT(ext->oe_state == OES_RPC);
1912                 mem_tight |= ext->oe_memalloc;
1913                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1914                         ++page_count;
1915                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1916                         if (starting_offset > oap->oap_obj_off)
1917                                 starting_offset = oap->oap_obj_off;
1918                         else
1919                                 LASSERT(oap->oap_page_off == 0);
1920                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1921                                 ending_offset = oap->oap_obj_off +
1922                                                 oap->oap_count;
1923                         else
1924                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1925                                         PAGE_CACHE_SIZE);
1926                 }
1927         }
1928
1929         soft_sync = osc_over_unstable_soft_limit(cli);
1930         if (mem_tight)
1931                 mpflag = cfs_memory_pressure_get_and_set();
1932
1933         OBD_ALLOC(crattr, sizeof(*crattr));
1934         if (crattr == NULL)
1935                 GOTO(out, rc = -ENOMEM);
1936
1937         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1938         if (pga == NULL)
1939                 GOTO(out, rc = -ENOMEM);
1940
1941         OBDO_ALLOC(oa);
1942         if (oa == NULL)
1943                 GOTO(out, rc = -ENOMEM);
1944
1945         i = 0;
1946         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1947                 struct cl_page *page = oap2cl_page(oap);
1948                 if (clerq == NULL) {
1949                         clerq = cl_req_alloc(env, page, crt,
1950                                              1 /* only 1-object rpcs for now */);
1951                         if (IS_ERR(clerq))
1952                                 GOTO(out, rc = PTR_ERR(clerq));
1953                 }
1954                 if (mem_tight)
1955                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1956                 if (soft_sync)
1957                         oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
1958                 pga[i] = &oap->oap_brw_page;
1959                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1960                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1961                        pga[i]->pg, page_index(oap->oap_page), oap,
1962                        pga[i]->flag);
1963                 i++;
1964                 cl_req_page_add(env, clerq, page);
1965         }
1966
1967         /* always get the data for the obdo for the rpc */
1968         LASSERT(clerq != NULL);
1969         crattr->cra_oa = oa;
1970         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1971
1972         rc = cl_req_prep(env, clerq);
1973         if (rc != 0) {
1974                 CERROR("cl_req_prep failed: %d\n", rc);
1975                 GOTO(out, rc);
1976         }
1977
1978         sort_brw_pages(pga, page_count);
1979         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1980                         pga, &req, crattr->cra_capa, 1, 0);
1981         if (rc != 0) {
1982                 CERROR("prep_req failed: %d\n", rc);
1983                 GOTO(out, rc);
1984         }
1985
1986         req->rq_commit_cb = brw_commit;
1987         req->rq_interpret_reply = brw_interpret;
1988
1989         if (mem_tight != 0)
1990                 req->rq_memalloc = 1;
1991
1992         /* Need to update the timestamps after the request is built in case
1993          * we race with setattr (locally or in queue at OST).  If OST gets
1994          * later setattr before earlier BRW (as determined by the request xid),
1995          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1996          * way to do this in a single call.  bug 10150 */
1997         cl_req_attr_set(env, clerq, crattr,
1998                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1999
2000         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2001
2002         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2003         aa = ptlrpc_req_async_args(req);
2004         INIT_LIST_HEAD(&aa->aa_oaps);
2005         list_splice_init(&rpc_list, &aa->aa_oaps);
2006         INIT_LIST_HEAD(&aa->aa_exts);
2007         list_splice_init(ext_list, &aa->aa_exts);
2008         aa->aa_clerq = clerq;
2009
2010         /* queued sync pages can be torn down while the pages
2011          * were between the pending list and the rpc */
2012         tmp = NULL;
2013         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2014                 /* only one oap gets a request reference */
2015                 if (tmp == NULL)
2016                         tmp = oap;
2017                 if (oap->oap_interrupted && !req->rq_intr) {
2018                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2019                                         oap, req);
2020                         ptlrpc_mark_interrupted(req);
2021                 }
2022         }
2023         if (tmp != NULL)
2024                 tmp->oap_request = ptlrpc_request_addref(req);
2025
2026         client_obd_list_lock(&cli->cl_loi_list_lock);
2027         starting_offset >>= PAGE_CACHE_SHIFT;
2028         if (cmd == OBD_BRW_READ) {
2029                 cli->cl_r_in_flight++;
2030                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2031                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2032                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2033                                       starting_offset + 1);
2034         } else {
2035                 cli->cl_w_in_flight++;
2036                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2037                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2038                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2039                                       starting_offset + 1);
2040         }
2041         client_obd_list_unlock(&cli->cl_loi_list_lock);
2042
2043         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2044                   page_count, aa, cli->cl_r_in_flight,
2045                   cli->cl_w_in_flight);
2046
2047         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2048          * see which CPU/NUMA node the majority of pages were allocated
2049          * on, and try to assign the async RPC to the CPU core
2050          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2051          *
2052          * But on the other hand, we expect that multiple ptlrpcd
2053          * threads and the initial write sponsor can run in parallel,
2054          * especially when data checksum is enabled, which is CPU-bound
2055          * operation and single ptlrpcd thread cannot process in time.
2056          * So more ptlrpcd threads sharing BRW load
2057          * (with PDL_POLICY_ROUND) seems better.
2058          */
2059         ptlrpcd_add_req(req, pol, -1);
2060         rc = 0;
2061         EXIT;
2062
2063 out:
2064         if (mem_tight != 0)
2065                 cfs_memory_pressure_restore(mpflag);
2066
2067         if (crattr != NULL) {
2068                 capa_put(crattr->cra_capa);
2069                 OBD_FREE(crattr, sizeof(*crattr));
2070         }
2071
2072         if (rc != 0) {
2073                 LASSERT(req == NULL);
2074
2075                 if (oa)
2076                         OBDO_FREE(oa);
2077                 if (pga)
2078                         OBD_FREE(pga, sizeof(*pga) * page_count);
2079                 /* this should happen rarely and is pretty bad, it makes the
2080                  * pending list not follow the dirty order */
2081                 while (!list_empty(ext_list)) {
2082                         ext = list_entry(ext_list->next, struct osc_extent,
2083                                          oe_link);
2084                         list_del_init(&ext->oe_link);
2085                         osc_extent_finish(env, ext, 0, rc);
2086                 }
2087                 if (clerq && !IS_ERR(clerq))
2088                         cl_req_completion(env, clerq, rc);
2089         }
2090         RETURN(rc);
2091 }
2092
2093 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2094                                         struct ldlm_enqueue_info *einfo)
2095 {
2096         void *data = einfo->ei_cbdata;
2097         int set = 0;
2098
2099         LASSERT(lock != NULL);
2100         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2101         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2102         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2103         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2104
2105         lock_res_and_lock(lock);
2106         spin_lock(&osc_ast_guard);
2107
2108         if (lock->l_ast_data == NULL)
2109                 lock->l_ast_data = data;
2110         if (lock->l_ast_data == data)
2111                 set = 1;
2112
2113         spin_unlock(&osc_ast_guard);
2114         unlock_res_and_lock(lock);
2115
2116         return set;
2117 }
2118
2119 static int osc_set_data_with_check(struct lustre_handle *lockh,
2120                                    struct ldlm_enqueue_info *einfo)
2121 {
2122         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2123         int set = 0;
2124
2125         if (lock != NULL) {
2126                 set = osc_set_lock_data_with_check(lock, einfo);
2127                 LDLM_LOCK_PUT(lock);
2128         } else
2129                 CERROR("lockh %p, data %p - client evicted?\n",
2130                        lockh, einfo->ei_cbdata);
2131         return set;
2132 }
2133
2134 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2135                              ldlm_iterator_t replace, void *data)
2136 {
2137         struct ldlm_res_id res_id;
2138         struct obd_device *obd = class_exp2obd(exp);
2139
2140         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2141         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2142         return 0;
2143 }
2144
2145 /* find any ldlm lock of the inode in osc
2146  * return 0    not find
2147  *        1    find one
2148  *      < 0    error */
2149 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2150                            ldlm_iterator_t replace, void *data)
2151 {
2152         struct ldlm_res_id res_id;
2153         struct obd_device *obd = class_exp2obd(exp);
2154         int rc = 0;
2155
2156         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2157         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2158         if (rc == LDLM_ITER_STOP)
2159                 return(1);
2160         if (rc == LDLM_ITER_CONTINUE)
2161                 return(0);
2162         return(rc);
2163 }
2164
2165 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2166                             obd_enqueue_update_f upcall, void *cookie,
2167                             __u64 *flags, int agl, int rc)
2168 {
2169         int intent = *flags & LDLM_FL_HAS_INTENT;
2170         ENTRY;
2171
2172         if (intent) {
2173                 /* The request was created before ldlm_cli_enqueue call. */
2174                 if (rc == ELDLM_LOCK_ABORTED) {
2175                         struct ldlm_reply *rep;
2176                         rep = req_capsule_server_get(&req->rq_pill,
2177                                                      &RMF_DLM_REP);
2178
2179                         LASSERT(rep != NULL);
2180                         rep->lock_policy_res1 =
2181                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2182                         if (rep->lock_policy_res1)
2183                                 rc = rep->lock_policy_res1;
2184                 }
2185         }
2186
2187         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2188             (rc == 0)) {
2189                 *flags |= LDLM_FL_LVB_READY;
2190                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2191                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2192         }
2193
2194         /* Call the update callback. */
2195         rc = (*upcall)(cookie, rc);
2196         RETURN(rc);
2197 }
2198
2199 static int osc_enqueue_interpret(const struct lu_env *env,
2200                                  struct ptlrpc_request *req,
2201                                  struct osc_enqueue_args *aa, int rc)
2202 {
2203         struct ldlm_lock *lock;
2204         struct lustre_handle handle;
2205         __u32 mode;
2206         struct ost_lvb *lvb;
2207         __u32 lvb_len;
2208         __u64 *flags = aa->oa_flags;
2209
2210         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2211          * might be freed anytime after lock upcall has been called. */
2212         lustre_handle_copy(&handle, aa->oa_lockh);
2213         mode = aa->oa_ei->ei_mode;
2214
2215         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2216          * be valid. */
2217         lock = ldlm_handle2lock(&handle);
2218
2219         /* Take an additional reference so that a blocking AST that
2220          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2221          * to arrive after an upcall has been executed by
2222          * osc_enqueue_fini(). */
2223         ldlm_lock_addref(&handle, mode);
2224
2225         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2226         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2227
2228         /* Let CP AST to grant the lock first. */
2229         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2230
2231         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2232                 lvb = NULL;
2233                 lvb_len = 0;
2234         } else {
2235                 lvb = aa->oa_lvb;
2236                 lvb_len = sizeof(*aa->oa_lvb);
2237         }
2238
2239         /* Complete obtaining the lock procedure. */
2240         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2241                                    mode, flags, lvb, lvb_len, &handle, rc);
2242         /* Complete osc stuff. */
2243         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2244                               flags, aa->oa_agl, rc);
2245
2246         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2247
2248         /* Release the lock for async request. */
2249         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2250                 /*
2251                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2252                  * not already released by
2253                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2254                  */
2255                 ldlm_lock_decref(&handle, mode);
2256
2257         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2258                  aa->oa_lockh, req, aa);
2259         ldlm_lock_decref(&handle, mode);
2260         LDLM_LOCK_PUT(lock);
2261         return rc;
2262 }
2263
2264 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2265
2266 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2267  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2268  * other synchronous requests, however keeping some locks and trying to obtain
2269  * others may take a considerable amount of time in a case of ost failure; and
2270  * when other sync requests do not get released lock from a client, the client
2271  * is excluded from the cluster -- such scenarious make the life difficult, so
2272  * release locks just after they are obtained. */
2273 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2274                      __u64 *flags, ldlm_policy_data_t *policy,
2275                      struct ost_lvb *lvb, int kms_valid,
2276                      obd_enqueue_update_f upcall, void *cookie,
2277                      struct ldlm_enqueue_info *einfo,
2278                      struct lustre_handle *lockh,
2279                      struct ptlrpc_request_set *rqset, int async, int agl)
2280 {
2281         struct obd_device *obd = exp->exp_obd;
2282         struct ptlrpc_request *req = NULL;
2283         int intent = *flags & LDLM_FL_HAS_INTENT;
2284         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2285         ldlm_mode_t mode;
2286         int rc;
2287         ENTRY;
2288
2289         /* Filesystem lock extents are extended to page boundaries so that
2290          * dealing with the page cache is a little smoother.  */
2291         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2292         policy->l_extent.end |= ~CFS_PAGE_MASK;
2293
2294         /*
2295          * kms is not valid when either object is completely fresh (so that no
2296          * locks are cached), or object was evicted. In the latter case cached
2297          * lock cannot be used, because it would prime inode state with
2298          * potentially stale LVB.
2299          */
2300         if (!kms_valid)
2301                 goto no_match;
2302
2303         /* Next, search for already existing extent locks that will cover us */
2304         /* If we're trying to read, we also search for an existing PW lock.  The
2305          * VFS and page cache already protect us locally, so lots of readers/
2306          * writers can share a single PW lock.
2307          *
2308          * There are problems with conversion deadlocks, so instead of
2309          * converting a read lock to a write lock, we'll just enqueue a new
2310          * one.
2311          *
2312          * At some point we should cancel the read lock instead of making them
2313          * send us a blocking callback, but there are problems with canceling
2314          * locks out from other users right now, too. */
2315         mode = einfo->ei_mode;
2316         if (einfo->ei_mode == LCK_PR)
2317                 mode |= LCK_PW;
2318         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2319                                einfo->ei_type, policy, mode, lockh, 0);
2320         if (mode) {
2321                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2322
2323                 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2324                         /* For AGL, if enqueue RPC is sent but the lock is not
2325                          * granted, then skip to process this strpe.
2326                          * Return -ECANCELED to tell the caller. */
2327                         ldlm_lock_decref(lockh, mode);
2328                         LDLM_LOCK_PUT(matched);
2329                         RETURN(-ECANCELED);
2330                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2331                         *flags |= LDLM_FL_LVB_READY;
2332                         /* addref the lock only if not async requests and PW
2333                          * lock is matched whereas we asked for PR. */
2334                         if (!rqset && einfo->ei_mode != mode)
2335                                 ldlm_lock_addref(lockh, LCK_PR);
2336                         if (intent) {
2337                                 /* I would like to be able to ASSERT here that
2338                                  * rss <= kms, but I can't, for reasons which
2339                                  * are explained in lov_enqueue() */
2340                         }
2341
2342                         /* We already have a lock, and it's referenced.
2343                          *
2344                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2345                          * AGL upcall may change it to CLS_HELD directly. */
2346                         (*upcall)(cookie, ELDLM_OK);
2347
2348                         if (einfo->ei_mode != mode)
2349                                 ldlm_lock_decref(lockh, LCK_PW);
2350                         else if (rqset)
2351                                 /* For async requests, decref the lock. */
2352                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2353                         LDLM_LOCK_PUT(matched);
2354                         RETURN(ELDLM_OK);
2355                 } else {
2356                         ldlm_lock_decref(lockh, mode);
2357                         LDLM_LOCK_PUT(matched);
2358                 }
2359         }
2360
2361  no_match:
2362         if (intent) {
2363                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2364                                            &RQF_LDLM_ENQUEUE_LVB);
2365                 if (req == NULL)
2366                         RETURN(-ENOMEM);
2367
2368                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2369                 if (rc < 0) {
2370                         ptlrpc_request_free(req);
2371                         RETURN(rc);
2372                 }
2373
2374                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2375                                      sizeof *lvb);
2376                 ptlrpc_request_set_replen(req);
2377         }
2378
2379         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2380         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2381
2382         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2383                               sizeof(*lvb), LVB_T_OST, lockh, async);
2384         if (rqset) {
2385                 if (!rc) {
2386                         struct osc_enqueue_args *aa;
2387                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2388                         aa = ptlrpc_req_async_args(req);
2389                         aa->oa_ei = einfo;
2390                         aa->oa_exp = exp;
2391                         aa->oa_flags  = flags;
2392                         aa->oa_upcall = upcall;
2393                         aa->oa_cookie = cookie;
2394                         aa->oa_lvb    = lvb;
2395                         aa->oa_lockh  = lockh;
2396                         aa->oa_agl    = !!agl;
2397
2398                         req->rq_interpret_reply =
2399                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2400                         if (rqset == PTLRPCD_SET)
2401                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2402                         else
2403                                 ptlrpc_set_add_req(rqset, req);
2404                 } else if (intent) {
2405                         ptlrpc_req_finished(req);
2406                 }
2407                 RETURN(rc);
2408         }
2409
2410         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2411         if (intent)
2412                 ptlrpc_req_finished(req);
2413
2414         RETURN(rc);
2415 }
2416
2417 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2418                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2419                    __u64 *flags, void *data, struct lustre_handle *lockh,
2420                    int unref)
2421 {
2422         struct obd_device *obd = exp->exp_obd;
2423         __u64 lflags = *flags;
2424         ldlm_mode_t rc;
2425         ENTRY;
2426
2427         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2428                 RETURN(-EIO);
2429
2430         /* Filesystem lock extents are extended to page boundaries so that
2431          * dealing with the page cache is a little smoother */
2432         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2433         policy->l_extent.end |= ~CFS_PAGE_MASK;
2434
2435         /* Next, search for already existing extent locks that will cover us */
2436         /* If we're trying to read, we also search for an existing PW lock.  The
2437          * VFS and page cache already protect us locally, so lots of readers/
2438          * writers can share a single PW lock. */
2439         rc = mode;
2440         if (mode == LCK_PR)
2441                 rc |= LCK_PW;
2442         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2443                              res_id, type, policy, rc, lockh, unref);
2444         if (rc) {
2445                 if (data != NULL) {
2446                         if (!osc_set_data_with_check(lockh, data)) {
2447                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2448                                         ldlm_lock_decref(lockh, rc);
2449                                 RETURN(0);
2450                         }
2451                 }
2452                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2453                         ldlm_lock_addref(lockh, LCK_PR);
2454                         ldlm_lock_decref(lockh, LCK_PW);
2455                 }
2456                 RETURN(rc);
2457         }
2458         RETURN(rc);
2459 }
2460
2461 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2462 {
2463         ENTRY;
2464
2465         if (unlikely(mode == LCK_GROUP))
2466                 ldlm_lock_decref_and_cancel(lockh, mode);
2467         else
2468                 ldlm_lock_decref(lockh, mode);
2469
2470         RETURN(0);
2471 }
2472
2473 static int osc_statfs_interpret(const struct lu_env *env,
2474                                 struct ptlrpc_request *req,
2475                                 struct osc_async_args *aa, int rc)
2476 {
2477         struct obd_statfs *msfs;
2478         ENTRY;
2479
2480         if (rc == -EBADR)
2481                 /* The request has in fact never been sent
2482                  * due to issues at a higher level (LOV).
2483                  * Exit immediately since the caller is
2484                  * aware of the problem and takes care
2485                  * of the clean up */
2486                  RETURN(rc);
2487
2488         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2489             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2490                 GOTO(out, rc = 0);
2491
2492         if (rc != 0)
2493                 GOTO(out, rc);
2494
2495         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2496         if (msfs == NULL) {
2497                 GOTO(out, rc = -EPROTO);
2498         }
2499
2500         *aa->aa_oi->oi_osfs = *msfs;
2501 out:
2502         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2503         RETURN(rc);
2504 }
2505
2506 static int osc_statfs_async(struct obd_export *exp,
2507                             struct obd_info *oinfo, __u64 max_age,
2508                             struct ptlrpc_request_set *rqset)
2509 {
2510         struct obd_device     *obd = class_exp2obd(exp);
2511         struct ptlrpc_request *req;
2512         struct osc_async_args *aa;
2513         int                    rc;
2514         ENTRY;
2515
2516         /* We could possibly pass max_age in the request (as an absolute
2517          * timestamp or a "seconds.usec ago") so the target can avoid doing
2518          * extra calls into the filesystem if that isn't necessary (e.g.
2519          * during mount that would help a bit).  Having relative timestamps
2520          * is not so great if request processing is slow, while absolute
2521          * timestamps are not ideal because they need time synchronization. */
2522         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2523         if (req == NULL)
2524                 RETURN(-ENOMEM);
2525
2526         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2527         if (rc) {
2528                 ptlrpc_request_free(req);
2529                 RETURN(rc);
2530         }
2531         ptlrpc_request_set_replen(req);
2532         req->rq_request_portal = OST_CREATE_PORTAL;
2533         ptlrpc_at_set_req_timeout(req);
2534
2535         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2536                 /* procfs requests not want stat in wait for avoid deadlock */
2537                 req->rq_no_resend = 1;
2538                 req->rq_no_delay = 1;
2539         }
2540
2541         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2542         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2543         aa = ptlrpc_req_async_args(req);
2544         aa->aa_oi = oinfo;
2545
2546         ptlrpc_set_add_req(rqset, req);
2547         RETURN(0);
2548 }
2549
2550 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2551                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2552 {
2553         struct obd_device     *obd = class_exp2obd(exp);
2554         struct obd_statfs     *msfs;
2555         struct ptlrpc_request *req;
2556         struct obd_import     *imp = NULL;
2557         int rc;
2558         ENTRY;
2559
2560         /*Since the request might also come from lprocfs, so we need
2561          *sync this with client_disconnect_export Bug15684*/
2562         down_read(&obd->u.cli.cl_sem);
2563         if (obd->u.cli.cl_import)
2564                 imp = class_import_get(obd->u.cli.cl_import);
2565         up_read(&obd->u.cli.cl_sem);
2566         if (!imp)
2567                 RETURN(-ENODEV);
2568
2569         /* We could possibly pass max_age in the request (as an absolute
2570          * timestamp or a "seconds.usec ago") so the target can avoid doing
2571          * extra calls into the filesystem if that isn't necessary (e.g.
2572          * during mount that would help a bit).  Having relative timestamps
2573          * is not so great if request processing is slow, while absolute
2574          * timestamps are not ideal because they need time synchronization. */
2575         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2576
2577         class_import_put(imp);
2578
2579         if (req == NULL)
2580                 RETURN(-ENOMEM);
2581
2582         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2583         if (rc) {
2584                 ptlrpc_request_free(req);
2585                 RETURN(rc);
2586         }
2587         ptlrpc_request_set_replen(req);
2588         req->rq_request_portal = OST_CREATE_PORTAL;
2589         ptlrpc_at_set_req_timeout(req);
2590
2591         if (flags & OBD_STATFS_NODELAY) {
2592                 /* procfs requests not want stat in wait for avoid deadlock */
2593                 req->rq_no_resend = 1;
2594                 req->rq_no_delay = 1;
2595         }
2596
2597         rc = ptlrpc_queue_wait(req);
2598         if (rc)
2599                 GOTO(out, rc);
2600
2601         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2602         if (msfs == NULL) {
2603                 GOTO(out, rc = -EPROTO);
2604         }
2605
2606         *osfs = *msfs;
2607
2608         EXIT;
2609  out:
2610         ptlrpc_req_finished(req);
2611         return rc;
2612 }
2613
2614 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2615                          void *karg, void *uarg)
2616 {
2617         struct obd_device *obd = exp->exp_obd;
2618         struct obd_ioctl_data *data = karg;
2619         int err = 0;
2620         ENTRY;
2621
2622         if (!try_module_get(THIS_MODULE)) {
2623                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2624                        module_name(THIS_MODULE));
2625                 return -EINVAL;
2626         }
2627         switch (cmd) {
2628         case OBD_IOC_CLIENT_RECOVER:
2629                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2630                                             data->ioc_inlbuf1, 0);
2631                 if (err > 0)
2632                         err = 0;
2633                 GOTO(out, err);
2634         case IOC_OSC_SET_ACTIVE:
2635                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2636                                                data->ioc_offset);
2637                 GOTO(out, err);
2638         case OBD_IOC_POLL_QUOTACHECK:
2639                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2640                 GOTO(out, err);
2641         case OBD_IOC_PING_TARGET:
2642                 err = ptlrpc_obd_ping(obd);
2643                 GOTO(out, err);
2644         default:
2645                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2646                        cmd, current_comm());
2647                 GOTO(out, err = -ENOTTY);
2648         }
2649 out:
2650         module_put(THIS_MODULE);
2651         return err;
2652 }
2653
2654 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2655                         obd_count keylen, void *key, __u32 *vallen, void *val,
2656                         struct lov_stripe_md *lsm)
2657 {
2658         ENTRY;
2659         if (!vallen || !val)
2660                 RETURN(-EFAULT);
2661
2662         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2663                 __u32 *stripe = val;
2664                 *vallen = sizeof(*stripe);
2665                 *stripe = 0;
2666                 RETURN(0);
2667         } else if (KEY_IS(KEY_LAST_ID)) {
2668                 struct ptlrpc_request *req;
2669                 obd_id                *reply;
2670                 char                  *tmp;
2671                 int                    rc;
2672
2673                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2674                                            &RQF_OST_GET_INFO_LAST_ID);
2675                 if (req == NULL)
2676                         RETURN(-ENOMEM);
2677
2678                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2679                                      RCL_CLIENT, keylen);
2680                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2681                 if (rc) {
2682                         ptlrpc_request_free(req);
2683                         RETURN(rc);
2684                 }
2685
2686                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2687                 memcpy(tmp, key, keylen);
2688
2689                 req->rq_no_delay = req->rq_no_resend = 1;
2690                 ptlrpc_request_set_replen(req);
2691                 rc = ptlrpc_queue_wait(req);
2692                 if (rc)
2693                         GOTO(out, rc);
2694
2695                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2696                 if (reply == NULL)
2697                         GOTO(out, rc = -EPROTO);
2698
2699                 *((obd_id *)val) = *reply;
2700         out:
2701                 ptlrpc_req_finished(req);
2702                 RETURN(rc);
2703         } else if (KEY_IS(KEY_FIEMAP)) {
2704                 struct ll_fiemap_info_key *fm_key =
2705                                 (struct ll_fiemap_info_key *)key;
2706                 struct ldlm_res_id       res_id;
2707                 ldlm_policy_data_t       policy;
2708                 struct lustre_handle     lockh;
2709                 ldlm_mode_t              mode = 0;
2710                 struct ptlrpc_request   *req;
2711                 struct ll_user_fiemap   *reply;
2712                 char                    *tmp;
2713                 int                      rc;
2714
2715                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2716                         goto skip_locking;
2717
2718                 policy.l_extent.start = fm_key->fiemap.fm_start &
2719                                                 CFS_PAGE_MASK;
2720
2721                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2722                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2723                         policy.l_extent.end = OBD_OBJECT_EOF;
2724                 else
2725                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2726                                 fm_key->fiemap.fm_length +
2727                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2728
2729                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2730                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2731                                        LDLM_FL_BLOCK_GRANTED |
2732                                        LDLM_FL_LVB_READY,
2733                                        &res_id, LDLM_EXTENT, &policy,
2734                                        LCK_PR | LCK_PW, &lockh, 0);
2735                 if (mode) { /* lock is cached on client */
2736                         if (mode != LCK_PR) {
2737                                 ldlm_lock_addref(&lockh, LCK_PR);
2738                                 ldlm_lock_decref(&lockh, LCK_PW);
2739                         }
2740                 } else { /* no cached lock, needs acquire lock on server side */
2741                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2742                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2743                 }
2744
2745 skip_locking:
2746                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2747                                            &RQF_OST_GET_INFO_FIEMAP);
2748                 if (req == NULL)
2749                         GOTO(drop_lock, rc = -ENOMEM);
2750
2751                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2752                                      RCL_CLIENT, keylen);
2753                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2754                                      RCL_CLIENT, *vallen);
2755                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2756                                      RCL_SERVER, *vallen);
2757
2758                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2759                 if (rc) {
2760                         ptlrpc_request_free(req);
2761                         GOTO(drop_lock, rc);
2762                 }
2763
2764                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2765                 memcpy(tmp, key, keylen);
2766                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2767                 memcpy(tmp, val, *vallen);
2768
2769                 ptlrpc_request_set_replen(req);
2770                 rc = ptlrpc_queue_wait(req);
2771                 if (rc)
2772                         GOTO(fini_req, rc);
2773
2774                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2775                 if (reply == NULL)
2776                         GOTO(fini_req, rc = -EPROTO);
2777
2778                 memcpy(val, reply, *vallen);
2779 fini_req:
2780                 ptlrpc_req_finished(req);
2781 drop_lock:
2782                 if (mode)
2783                         ldlm_lock_decref(&lockh, LCK_PR);
2784                 RETURN(rc);
2785         }
2786
2787         RETURN(-EINVAL);
2788 }
2789
2790 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2791                               obd_count keylen, void *key, obd_count vallen,
2792                               void *val, struct ptlrpc_request_set *set)
2793 {
2794         struct ptlrpc_request *req;
2795         struct obd_device     *obd = exp->exp_obd;
2796         struct obd_import     *imp = class_exp2cliimp(exp);
2797         char                  *tmp;
2798         int                    rc;
2799         ENTRY;
2800
2801         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2802
2803         if (KEY_IS(KEY_CHECKSUM)) {
2804                 if (vallen != sizeof(int))
2805                         RETURN(-EINVAL);
2806                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2807                 RETURN(0);
2808         }
2809
2810         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2811                 sptlrpc_conf_client_adapt(obd);
2812                 RETURN(0);
2813         }
2814
2815         if (KEY_IS(KEY_FLUSH_CTX)) {
2816                 sptlrpc_import_flush_my_ctx(imp);
2817                 RETURN(0);
2818         }
2819
2820         if (KEY_IS(KEY_CACHE_SET)) {
2821                 struct client_obd *cli = &obd->u.cli;
2822
2823                 LASSERT(cli->cl_cache == NULL); /* only once */
2824                 cli->cl_cache = (struct cl_client_cache *)val;
2825                 atomic_inc(&cli->cl_cache->ccc_users);
2826                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2827
2828                 /* add this osc into entity list */
2829                 LASSERT(list_empty(&cli->cl_lru_osc));
2830                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2831                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2832                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2833
2834                 RETURN(0);
2835         }
2836
2837         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2838                 struct client_obd *cli = &obd->u.cli;
2839                 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2840                 long target = *(long *)val;
2841
2842                 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2843                 *(long *)val -= nr;
2844                 RETURN(0);
2845         }
2846
2847         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2848                 RETURN(-EINVAL);
2849
2850         /* We pass all other commands directly to OST. Since nobody calls osc
2851            methods directly and everybody is supposed to go through LOV, we
2852            assume lov checked invalid values for us.
2853            The only recognised values so far are evict_by_nid and mds_conn.
2854            Even if something bad goes through, we'd get a -EINVAL from OST
2855            anyway. */
2856
2857         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2858                                                 &RQF_OST_SET_GRANT_INFO :
2859                                                 &RQF_OBD_SET_INFO);
2860         if (req == NULL)
2861                 RETURN(-ENOMEM);
2862
2863         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2864                              RCL_CLIENT, keylen);
2865         if (!KEY_IS(KEY_GRANT_SHRINK))
2866                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2867                                      RCL_CLIENT, vallen);
2868         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2869         if (rc) {
2870                 ptlrpc_request_free(req);
2871                 RETURN(rc);
2872         }
2873
2874         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2875         memcpy(tmp, key, keylen);
2876         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2877                                                         &RMF_OST_BODY :
2878                                                         &RMF_SETINFO_VAL);
2879         memcpy(tmp, val, vallen);
2880
2881         if (KEY_IS(KEY_GRANT_SHRINK)) {
2882                 struct osc_grant_args *aa;
2883                 struct obdo *oa;
2884
2885                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2886                 aa = ptlrpc_req_async_args(req);
2887                 OBDO_ALLOC(oa);
2888                 if (!oa) {
2889                         ptlrpc_req_finished(req);
2890                         RETURN(-ENOMEM);
2891                 }
2892                 *oa = ((struct ost_body *)val)->oa;
2893                 aa->aa_oa = oa;
2894                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2895         }
2896
2897         ptlrpc_request_set_replen(req);
2898         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2899                 LASSERT(set != NULL);
2900                 ptlrpc_set_add_req(set, req);
2901                 ptlrpc_check_set(NULL, set);
2902         } else
2903                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2904
2905         RETURN(0);
2906 }
2907
2908 static int osc_reconnect(const struct lu_env *env,
2909                          struct obd_export *exp, struct obd_device *obd,
2910                          struct obd_uuid *cluuid,
2911                          struct obd_connect_data *data,
2912                          void *localdata)
2913 {
2914         struct client_obd *cli = &obd->u.cli;
2915
2916         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2917                 long lost_grant;
2918
2919                 client_obd_list_lock(&cli->cl_loi_list_lock);
2920                 data->ocd_grant = (cli->cl_avail_grant +
2921                                   (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
2922                                   2 * cli_brw_size(obd);
2923                 lost_grant = cli->cl_lost_grant;
2924                 cli->cl_lost_grant = 0;
2925                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2926
2927                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
2928                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2929                        data->ocd_version, data->ocd_grant, lost_grant);
2930         }
2931
2932         RETURN(0);
2933 }
2934
2935 static int osc_disconnect(struct obd_export *exp)
2936 {
2937         struct obd_device *obd = class_exp2obd(exp);
2938         int rc;
2939
2940         rc = client_disconnect_export(exp);
2941         /**
2942          * Initially we put del_shrink_grant before disconnect_export, but it
2943          * causes the following problem if setup (connect) and cleanup
2944          * (disconnect) are tangled together.
2945          *      connect p1                     disconnect p2
2946          *   ptlrpc_connect_import
2947          *     ...............               class_manual_cleanup
2948          *                                     osc_disconnect
2949          *                                     del_shrink_grant
2950          *   ptlrpc_connect_interrupt
2951          *     init_grant_shrink
2952          *   add this client to shrink list
2953          *                                      cleanup_osc
2954          * Bang! pinger trigger the shrink.
2955          * So the osc should be disconnected from the shrink list, after we
2956          * are sure the import has been destroyed. BUG18662
2957          */
2958         if (obd->u.cli.cl_import == NULL)
2959                 osc_del_shrink_grant(&obd->u.cli);
2960         return rc;
2961 }
2962
2963 static int osc_import_event(struct obd_device *obd,
2964                             struct obd_import *imp,
2965                             enum obd_import_event event)
2966 {
2967         struct client_obd *cli;
2968         int rc = 0;
2969
2970         ENTRY;
2971         LASSERT(imp->imp_obd == obd);
2972
2973         switch (event) {
2974         case IMP_EVENT_DISCON: {
2975                 cli = &obd->u.cli;
2976                 client_obd_list_lock(&cli->cl_loi_list_lock);
2977                 cli->cl_avail_grant = 0;
2978                 cli->cl_lost_grant = 0;
2979                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2980                 break;
2981         }
2982         case IMP_EVENT_INACTIVE: {
2983                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
2984                 break;
2985         }
2986         case IMP_EVENT_INVALIDATE: {
2987                 struct ldlm_namespace *ns = obd->obd_namespace;
2988                 struct lu_env         *env;
2989                 int                    refcheck;
2990
2991                 env = cl_env_get(&refcheck);
2992                 if (!IS_ERR(env)) {
2993                         /* Reset grants */
2994                         cli = &obd->u.cli;
2995                         /* all pages go to failing rpcs due to the invalid
2996                          * import */
2997                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
2998
2999                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3000                         cl_env_put(env, &refcheck);
3001                 } else
3002                         rc = PTR_ERR(env);
3003                 break;
3004         }
3005         case IMP_EVENT_ACTIVE: {
3006                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3007                 break;
3008         }
3009         case IMP_EVENT_OCD: {
3010                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3011
3012                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3013                         osc_init_grant(&obd->u.cli, ocd);
3014
3015                 /* See bug 7198 */
3016                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3017                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3018
3019                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3020                 break;
3021         }
3022         case IMP_EVENT_DEACTIVATE: {
3023                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3024                 break;
3025         }
3026         case IMP_EVENT_ACTIVATE: {
3027                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3028                 break;
3029         }
3030         default:
3031                 CERROR("Unknown import event %d\n", event);
3032                 LBUG();
3033         }
3034         RETURN(rc);
3035 }
3036
3037 /**
3038  * Determine whether the lock can be canceled before replaying the lock
3039  * during recovery, see bug16774 for detailed information.
3040  *
3041  * \retval zero the lock can't be canceled
3042  * \retval other ok to cancel
3043  */
3044 static int osc_cancel_weight(struct ldlm_lock *lock)
3045 {
3046         /*
3047          * Cancel all unused and granted extent lock.
3048          */
3049         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3050             lock->l_granted_mode == lock->l_req_mode &&
3051             osc_ldlm_weigh_ast(lock) == 0)
3052                 RETURN(1);
3053
3054         RETURN(0);
3055 }
3056
3057 static int brw_queue_work(const struct lu_env *env, void *data)
3058 {
3059         struct client_obd *cli = data;
3060
3061         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3062
3063         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3064         RETURN(0);
3065 }
3066
3067 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3068 {
3069         struct client_obd *cli = &obd->u.cli;
3070         struct obd_type   *type;
3071         void              *handler;
3072         int                rc;
3073         ENTRY;
3074
3075         rc = ptlrpcd_addref();
3076         if (rc)
3077                 RETURN(rc);
3078
3079         rc = client_obd_setup(obd, lcfg);
3080         if (rc)
3081                 GOTO(out_ptlrpcd, rc);
3082
3083         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3084         if (IS_ERR(handler))
3085                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3086         cli->cl_writeback_work = handler;
3087
3088         handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3089         if (IS_ERR(handler))
3090                 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3091         cli->cl_lru_work = handler;
3092
3093         rc = osc_quota_setup(obd);
3094         if (rc)
3095                 GOTO(out_ptlrpcd_work, rc);
3096
3097         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3098
3099 #ifdef LPROCFS
3100         obd->obd_vars = lprocfs_osc_obd_vars;
3101 #endif
3102         /* If this is true then both client (osc) and server (osp) are on the
3103          * same node. The osp layer if loaded first will register the osc proc
3104          * directory. In that case this obd_device will be attached its proc
3105          * tree to type->typ_procsym instead of obd->obd_type->typ_procroot. */
3106         type = class_search_type(LUSTRE_OSP_NAME);
3107         if (type && type->typ_procsym) {
3108                 obd->obd_proc_entry = lprocfs_seq_register(obd->obd_name,
3109                                                            type->typ_procsym,
3110                                                            obd->obd_vars, obd);
3111                 if (IS_ERR(obd->obd_proc_entry)) {
3112                         rc = PTR_ERR(obd->obd_proc_entry);
3113                         CERROR("error %d setting up lprocfs for %s\n", rc,
3114                                obd->obd_name);
3115                         obd->obd_proc_entry = NULL;
3116                 }
3117         } else {
3118                 rc = lprocfs_obd_setup(obd);
3119         }
3120
3121         /* If the basic OSC proc tree construction succeeded then
3122          * lets do the rest. */
3123         if (rc == 0) {
3124                 lproc_osc_attach_seqstat(obd);
3125                 sptlrpc_lprocfs_cliobd_attach(obd);
3126                 ptlrpc_lprocfs_register_obd(obd);
3127         }
3128
3129         /* We need to allocate a few requests more, because
3130          * brw_interpret tries to create new requests before freeing
3131          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3132          * reserved, but I'm afraid that might be too much wasted RAM
3133          * in fact, so 2 is just my guess and still should work. */
3134         cli->cl_import->imp_rq_pool =
3135                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3136                                     OST_MAXREQSIZE,
3137                                     ptlrpc_add_rqs_to_pool);
3138
3139         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3140         ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3141         RETURN(0);
3142
3143 out_ptlrpcd_work:
3144         if (cli->cl_writeback_work != NULL) {
3145                 ptlrpcd_destroy_work(cli->cl_writeback_work);
3146                 cli->cl_writeback_work = NULL;
3147         }
3148         if (cli->cl_lru_work != NULL) {
3149                 ptlrpcd_destroy_work(cli->cl_lru_work);
3150                 cli->cl_lru_work = NULL;
3151         }
3152 out_client_setup:
3153         client_obd_cleanup(obd);
3154 out_ptlrpcd:
3155         ptlrpcd_decref();
3156         RETURN(rc);
3157 }
3158
3159 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3160 {
3161         int rc = 0;
3162         ENTRY;
3163
3164         switch (stage) {
3165         case OBD_CLEANUP_EARLY: {
3166                 struct obd_import *imp;
3167                 imp = obd->u.cli.cl_import;
3168                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3169                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3170                 ptlrpc_deactivate_import(imp);
3171                 spin_lock(&imp->imp_lock);
3172                 imp->imp_pingable = 0;
3173                 spin_unlock(&imp->imp_lock);
3174                 break;
3175         }
3176         case OBD_CLEANUP_EXPORTS: {
3177                 struct client_obd *cli = &obd->u.cli;
3178                 /* LU-464
3179                  * for echo client, export may be on zombie list, wait for
3180                  * zombie thread to cull it, because cli.cl_import will be
3181                  * cleared in client_disconnect_export():
3182                  *   class_export_destroy() -> obd_cleanup() ->
3183                  *   echo_device_free() -> echo_client_cleanup() ->
3184                  *   obd_disconnect() -> osc_disconnect() ->
3185                  *   client_disconnect_export()
3186                  */
3187                 obd_zombie_barrier();
3188                 if (cli->cl_writeback_work) {
3189                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3190                         cli->cl_writeback_work = NULL;
3191                 }
3192                 if (cli->cl_lru_work) {
3193                         ptlrpcd_destroy_work(cli->cl_lru_work);
3194                         cli->cl_lru_work = NULL;
3195                 }
3196                 obd_cleanup_client_import(obd);
3197                 ptlrpc_lprocfs_unregister_obd(obd);
3198                 lprocfs_obd_cleanup(obd);
3199                 break;
3200                 }
3201         }
3202         RETURN(rc);
3203 }
3204
3205 int osc_cleanup(struct obd_device *obd)
3206 {
3207         struct client_obd *cli = &obd->u.cli;
3208         int rc;
3209
3210         ENTRY;
3211
3212         /* lru cleanup */
3213         if (cli->cl_cache != NULL) {
3214                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3215                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3216                 list_del_init(&cli->cl_lru_osc);
3217                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3218                 cli->cl_lru_left = NULL;
3219                 atomic_dec(&cli->cl_cache->ccc_users);
3220                 cli->cl_cache = NULL;
3221         }
3222
3223         /* free memory of osc quota cache */
3224         osc_quota_cleanup(obd);
3225
3226         rc = client_obd_cleanup(obd);
3227
3228         ptlrpcd_decref();
3229         RETURN(rc);
3230 }
3231
3232 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3233 {
3234         int rc = class_process_proc_param(PARAM_OSC, obd->obd_vars, lcfg, obd);
3235         return rc > 0 ? 0: rc;
3236 }
3237
3238 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3239 {
3240         return osc_process_config_base(obd, buf);
3241 }
3242
3243 struct obd_ops osc_obd_ops = {
3244         .o_owner                = THIS_MODULE,
3245         .o_setup                = osc_setup,
3246         .o_precleanup           = osc_precleanup,
3247         .o_cleanup              = osc_cleanup,
3248         .o_add_conn             = client_import_add_conn,
3249         .o_del_conn             = client_import_del_conn,
3250         .o_connect              = client_connect_import,
3251         .o_reconnect            = osc_reconnect,
3252         .o_disconnect           = osc_disconnect,
3253         .o_statfs               = osc_statfs,
3254         .o_statfs_async         = osc_statfs_async,
3255         .o_unpackmd             = osc_unpackmd,
3256         .o_create               = osc_create,
3257         .o_destroy              = osc_destroy,
3258         .o_getattr              = osc_getattr,
3259         .o_getattr_async        = osc_getattr_async,
3260         .o_setattr              = osc_setattr,
3261         .o_setattr_async        = osc_setattr_async,
3262         .o_change_cbdata        = osc_change_cbdata,
3263         .o_find_cbdata          = osc_find_cbdata,
3264         .o_iocontrol            = osc_iocontrol,
3265         .o_get_info             = osc_get_info,
3266         .o_set_info_async       = osc_set_info_async,
3267         .o_import_event         = osc_import_event,
3268         .o_process_config       = osc_process_config,
3269         .o_quotactl             = osc_quotactl,
3270         .o_quotacheck           = osc_quotacheck,
3271 };
3272
3273 extern struct lu_kmem_descr osc_caches[];
3274 extern spinlock_t osc_ast_guard;
3275 extern struct lock_class_key osc_ast_guard_class;
3276
3277 int __init osc_init(void)
3278 {
3279         bool enable_proc = true;
3280         struct obd_type *type;
3281         int rc;
3282         ENTRY;
3283
3284         /* print an address of _any_ initialized kernel symbol from this
3285          * module, to allow debugging with gdb that doesn't support data
3286          * symbols from modules.*/
3287         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3288
3289         rc = lu_kmem_init(osc_caches);
3290         if (rc)
3291                 RETURN(rc);
3292
3293         type = class_search_type(LUSTRE_OSP_NAME);
3294         if (type != NULL && type->typ_procsym != NULL)
3295                 enable_proc = false;
3296
3297         rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3298                                  LUSTRE_OSC_NAME, &osc_device_type);
3299         if (rc) {
3300                 lu_kmem_fini(osc_caches);
3301                 RETURN(rc);
3302         }
3303
3304         spin_lock_init(&osc_ast_guard);
3305         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3306
3307         RETURN(rc);
3308 }
3309
3310 static void /*__exit*/ osc_exit(void)
3311 {
3312         class_unregister_type(LUSTRE_OSC_NAME);
3313         lu_kmem_fini(osc_caches);
3314 }
3315
3316 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3317 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3318 MODULE_LICENSE("GPL");
3319
3320 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);