Whamcloud - gitweb
LU-3963 client: move llite,lov,target,obdecho to linux list api
[fs/lustre-release.git] / lustre / target / out_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/target/out_lib.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <lu_target.h>
35 #include <lustre_update.h>
36 #include <obd.h>
37
38 struct dt_update_request*
39 out_find_update(struct thandle_update *tu, struct dt_device *dt_dev)
40 {
41         struct dt_update_request   *dt_update;
42
43         list_for_each_entry(dt_update, &tu->tu_remote_update_list,
44                             dur_list) {
45                 if (dt_update->dur_dt == dt_dev)
46                         return dt_update;
47         }
48         return NULL;
49 }
50 EXPORT_SYMBOL(out_find_update);
51
52 void out_destroy_update_req(struct dt_update_request *dt_update)
53 {
54         if (dt_update == NULL)
55                 return;
56
57         list_del(&dt_update->dur_list);
58         if (dt_update->dur_req != NULL)
59                 OBD_FREE_LARGE(dt_update->dur_req, dt_update->dur_req_len);
60
61         OBD_FREE_PTR(dt_update);
62         return;
63 }
64 EXPORT_SYMBOL(out_destroy_update_req);
65
66 struct dt_update_request *out_create_update_req(struct dt_device *dt)
67 {
68         struct dt_update_request *dt_update;
69
70         OBD_ALLOC_PTR(dt_update);
71         if (!dt_update)
72                 return ERR_PTR(-ENOMEM);
73
74         OBD_ALLOC_LARGE(dt_update->dur_req, OUT_UPDATE_INIT_BUFFER_SIZE);
75         if (dt_update->dur_req == NULL) {
76                 OBD_FREE_PTR(dt_update);
77                 return ERR_PTR(-ENOMEM);
78         }
79
80         dt_update->dur_req_len = OUT_UPDATE_INIT_BUFFER_SIZE;
81         INIT_LIST_HEAD(&dt_update->dur_list);
82         dt_update->dur_dt = dt;
83         dt_update->dur_batchid = 0;
84         dt_update->dur_req->ourq_magic = UPDATE_REQUEST_MAGIC;
85         dt_update->dur_req->ourq_count = 0;
86         INIT_LIST_HEAD(&dt_update->dur_cb_items);
87
88         return dt_update;
89 }
90 EXPORT_SYMBOL(out_create_update_req);
91
92 /**
93  * Find or create one loc in th_dev/dev_obj_update for the update,
94  * Because only one thread can access this thandle, no need
95  * lock now.
96  */
97 struct dt_update_request *out_find_create_update_loc(struct thandle *th,
98                                                   struct dt_object *dt)
99 {
100         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
101         struct thandle_update   *tu = th->th_update;
102         struct dt_update_request *update;
103         ENTRY;
104
105         if (tu == NULL) {
106                 OBD_ALLOC_PTR(tu);
107                 if (tu == NULL)
108                         RETURN(ERR_PTR(-ENOMEM));
109
110                 INIT_LIST_HEAD(&tu->tu_remote_update_list);
111                 tu->tu_sent_after_local_trans = 0;
112                 th->th_update = tu;
113         }
114
115         update = out_find_update(tu, dt_dev);
116         if (update != NULL)
117                 RETURN(update);
118
119         update = out_create_update_req(dt_dev);
120         if (IS_ERR(update))
121                 RETURN(update);
122
123         list_add_tail(&update->dur_list, &tu->tu_remote_update_list);
124
125         if (!tu->tu_only_remote_trans)
126                 thandle_get(th);
127
128         RETURN(update);
129 }
130 EXPORT_SYMBOL(out_find_create_update_loc);
131
132 int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
133                         const struct object_update_request *ureq,
134                         struct ptlrpc_request **reqp)
135 {
136         struct ptlrpc_request           *req;
137         struct object_update_request    *tmp;
138         int                             ureq_len;
139         int                             rc;
140         ENTRY;
141
142         req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
143         if (req == NULL)
144                 RETURN(-ENOMEM);
145
146         ureq_len = object_update_request_size(ureq);
147         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
148                              ureq_len);
149
150         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
151         if (rc != 0) {
152                 ptlrpc_req_finished(req);
153                 RETURN(rc);
154         }
155
156         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
157                              RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
158
159         tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
160         memcpy(tmp, ureq, ureq_len);
161
162         ptlrpc_request_set_replen(req);
163         req->rq_request_portal = OUT_PORTAL;
164         req->rq_reply_portal = OSC_REPLY_PORTAL;
165         *reqp = req;
166
167         RETURN(rc);
168 }
169 EXPORT_SYMBOL(out_prep_update_req);
170
171 int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
172                     struct dt_update_request *dt_update,
173                     struct ptlrpc_request **reqp)
174 {
175         struct ptlrpc_request   *req = NULL;
176         int                     rc;
177         ENTRY;
178
179         rc = out_prep_update_req(env, imp, dt_update->dur_req, &req);
180         if (rc != 0)
181                 RETURN(rc);
182
183         /* Note: some dt index api might return non-zero result here, like
184          * osd_index_ea_lookup, so we should only check rc < 0 here */
185         rc = ptlrpc_queue_wait(req);
186         if (rc < 0) {
187                 ptlrpc_req_finished(req);
188                 dt_update->dur_rc = rc;
189                 RETURN(rc);
190         }
191
192         if (reqp != NULL) {
193                 *reqp = req;
194                 RETURN(rc);
195         }
196
197         dt_update->dur_rc = rc;
198
199         ptlrpc_req_finished(req);
200
201         RETURN(rc);
202 }
203 EXPORT_SYMBOL(out_remote_sync);
204
205 static int out_resize_update_req(struct dt_update_request *dt_update,
206                                  int new_size)
207 {
208         struct object_update_request *ureq;
209
210         LASSERT(new_size > dt_update->dur_req_len);
211
212         CDEBUG(D_INFO, "%s: resize update_size from %d to %d\n",
213                dt_update->dur_dt->dd_lu_dev.ld_obd->obd_name,
214                dt_update->dur_req_len, new_size);
215
216         OBD_ALLOC_LARGE(ureq, new_size);
217         if (ureq == NULL)
218                 return -ENOMEM;
219
220         memcpy(ureq, dt_update->dur_req,
221                object_update_request_size(dt_update->dur_req));
222
223         OBD_FREE_LARGE(dt_update->dur_req, dt_update->dur_req_len);
224
225         dt_update->dur_req = ureq;
226         dt_update->dur_req_len = new_size;
227
228         return 0;
229 }
230
231 #define OUT_UPDATE_BUFFER_SIZE_ADD      4096
232 #define OUT_UPDATE_BUFFER_SIZE_MAX      (64 * 4096)  /* 64KB update size now */
233 /**
234  * Insert the update into the th_bufs for the device.
235  */
236
237 int out_insert_update(const struct lu_env *env,
238                       struct dt_update_request *update, int op,
239                       const struct lu_fid *fid, int params_count, int *lens,
240                       const char **bufs)
241 {
242         struct object_update_request    *ureq = update->dur_req;
243         int                             ureq_len;
244         struct object_update            *obj_update;
245         struct object_update_param      *param;
246         int                             update_length;
247         int                             rc = 0;
248         char                            *ptr;
249         int                             i;
250         ENTRY;
251
252         /* Check update size to make sure it can fit into the buffer */
253         ureq_len = object_update_request_size(ureq);
254         update_length = offsetof(struct object_update, ou_params[0]);
255         for (i = 0; i < params_count; i++)
256                 update_length += cfs_size_round(lens[i] + sizeof(*param));
257
258         if (unlikely(cfs_size_round(ureq_len + update_length) >
259                      update->dur_req_len)) {
260                 int new_size = update->dur_req_len;
261
262                 /* enlarge object update request size */
263                 while (new_size <
264                        cfs_size_round(ureq_len + update_length))
265                         new_size += OUT_UPDATE_BUFFER_SIZE_ADD;
266                 if (new_size >= OUT_UPDATE_BUFFER_SIZE_MAX)
267                         RETURN(-E2BIG);
268
269                 rc = out_resize_update_req(update, new_size);
270                 if (rc != 0)
271                         RETURN(rc);
272
273                 ureq = update->dur_req;
274         }
275
276         /* fill the update into the update buffer */
277         obj_update = (struct object_update *)((char *)ureq + ureq_len);
278         obj_update->ou_fid = *fid;
279         obj_update->ou_type = op;
280         obj_update->ou_params_count = (__u16)params_count;
281         obj_update->ou_batchid = update->dur_batchid;
282         param = &obj_update->ou_params[0];
283         for (i = 0; i < params_count; i++) {
284                 param->oup_len = lens[i];
285                 ptr = &param->oup_buf[0];
286                 memcpy(&param->oup_buf[0], bufs[i], lens[i]);
287                 param = (struct object_update_param *)((char *)param +
288                          object_update_param_size(param));
289         }
290
291         ureq->ourq_count++;
292
293         CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%d\n",
294                update->dur_dt->dd_lu_dev.ld_obd->obd_name, ureq, PFID(fid),
295                ureq->ourq_count, op, params_count, ureq_len + update_length);
296
297         RETURN(rc);
298 }
299 EXPORT_SYMBOL(out_insert_update);