Whamcloud - gitweb
LU-4975 ofd: documenting the ofd_fs.c
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "osp_internal.h"
35
36 struct osp_async_update_args {
37         struct dt_update_request *oaua_update;
38         bool                      oaua_flow_control;
39 };
40
41 struct osp_async_update_item {
42         struct list_head                 oaui_list;
43         struct osp_object               *oaui_obj;
44         void                            *oaui_data;
45         osp_async_update_interpterer_t   oaui_interpterer;
46 };
47
48 static struct osp_async_update_item *
49 osp_async_update_item_init(struct osp_object *obj, void *data,
50                            osp_async_update_interpterer_t interpterer)
51 {
52         struct osp_async_update_item *oaui;
53
54         OBD_ALLOC_PTR(oaui);
55         if (oaui == NULL)
56                 return NULL;
57
58         lu_object_get(osp2lu_obj(obj));
59         INIT_LIST_HEAD(&oaui->oaui_list);
60         oaui->oaui_obj = obj;
61         oaui->oaui_data = data;
62         oaui->oaui_interpterer = interpterer;
63
64         return oaui;
65 }
66
67 static void osp_async_update_item_fini(const struct lu_env *env,
68                                        struct osp_async_update_item *oaui)
69 {
70         LASSERT(list_empty(&oaui->oaui_list));
71
72         lu_object_put(env, osp2lu_obj(oaui->oaui_obj));
73         OBD_FREE_PTR(oaui);
74 }
75
76 static int osp_async_update_interpret(const struct lu_env *env,
77                                       struct ptlrpc_request *req,
78                                       void *arg, int rc)
79 {
80         struct object_update_reply      *reply  = NULL;
81         struct osp_async_update_args    *oaua   = arg;
82         struct dt_update_request        *dt_update = oaua->oaua_update;
83         struct osp_async_update_item    *oaui;
84         struct osp_async_update_item    *next;
85         int                              count  = 0;
86         int                              index  = 0;
87         int                              rc1    = 0;
88
89         if (oaua->oaua_flow_control)
90                 obd_put_request_slot(
91                                 &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli);
92
93         if (rc == 0 || req->rq_repmsg != NULL) {
94                 reply = req_capsule_server_sized_get(&req->rq_pill,
95                                                      &RMF_OUT_UPDATE_REPLY,
96                                                      OUT_UPDATE_REPLY_SIZE);
97                 if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
98                         rc1 = -EPROTO;
99                 else
100                         count = reply->ourp_count;
101         } else {
102                 rc1 = rc;
103         }
104
105         list_for_each_entry_safe(oaui, next, &dt_update->dur_cb_items,
106                                  oaui_list) {
107                 list_del_init(&oaui->oaui_list);
108                 if (index < count && reply->ourp_lens[index] > 0) {
109                         struct object_update_result *result;
110
111                         result = object_update_result_get(reply, index, NULL);
112                         if (result == NULL)
113                                 rc1 = -EPROTO;
114                         else
115                                 rc1 = result->our_rc;
116                 } else {
117                         rc1 = rc;
118                         if (unlikely(rc1 == 0))
119                                 rc1 = -EINVAL;
120                 }
121
122                 oaui->oaui_interpterer(env, reply, req, oaui->oaui_obj,
123                                        oaui->oaui_data, index, rc1);
124                 osp_async_update_item_fini(env, oaui);
125                 index++;
126         }
127
128         out_destroy_update_req(dt_update);
129
130         return 0;
131 }
132
133 int osp_unplug_async_update(const struct lu_env *env,
134                             struct osp_device *osp,
135                             struct dt_update_request *update)
136 {
137         struct osp_async_update_args    *args;
138         struct ptlrpc_request           *req = NULL;
139         int                              rc;
140
141         rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
142                                  update->dur_req, &req);
143         if (rc != 0) {
144                 struct osp_async_update_item *oaui;
145                 struct osp_async_update_item *next;
146
147                 list_for_each_entry_safe(oaui, next,
148                                          &update->dur_cb_items, oaui_list) {
149                         list_del_init(&oaui->oaui_list);
150                         oaui->oaui_interpterer(env, NULL, NULL, oaui->oaui_obj,
151                                                oaui->oaui_data, 0, rc);
152                         osp_async_update_item_fini(env, oaui);
153                 }
154                 out_destroy_update_req(update);
155         } else {
156                 LASSERT(list_empty(&update->dur_list));
157
158                 args = ptlrpc_req_async_args(req);
159                 args->oaua_update = update;
160                 req->rq_interpret_reply = osp_async_update_interpret;
161                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
162         }
163
164         return rc;
165 }
166
167 /* with osp::opd_async_requests_mutex held */
168 struct dt_update_request *
169 osp_find_or_create_async_update_request(struct osp_device *osp)
170 {
171         struct dt_update_request *update = osp->opd_async_requests;
172
173         if (update != NULL)
174                 return update;
175
176         update = out_create_update_req(&osp->opd_dt_dev);
177         if (!IS_ERR(update))
178                 osp->opd_async_requests = update;
179
180         return update;
181 }
182
183 /* with osp::opd_async_requests_mutex held */
184 int osp_insert_async_update(const struct lu_env *env,
185                             struct dt_update_request *update, int op,
186                             struct osp_object *obj, int count,
187                             int *lens, const char **bufs, void *data,
188                             osp_async_update_interpterer_t interpterer)
189 {
190         struct osp_async_update_item *oaui;
191         struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
192         int                           rc  = 0;
193         ENTRY;
194
195         oaui = osp_async_update_item_init(obj, data, interpterer);
196         if (oaui == NULL)
197                 RETURN(-ENOMEM);
198
199 again:
200         rc = out_insert_update(env, update, op, lu_object_fid(osp2lu_obj(obj)),
201                                count, lens, bufs);
202         if (rc == -E2BIG) {
203                 osp->opd_async_requests = NULL;
204                 mutex_unlock(&osp->opd_async_requests_mutex);
205
206                 rc = osp_unplug_async_update(env, osp, update);
207                 mutex_lock(&osp->opd_async_requests_mutex);
208                 if (rc != 0)
209                         GOTO(out, rc);
210
211                 update = osp_find_or_create_async_update_request(osp);
212                 if (IS_ERR(update))
213                         GOTO(out, rc = PTR_ERR(update));
214
215                 goto again;
216         }
217
218         if (rc == 0)
219                 list_add_tail(&oaui->oaui_list, &update->dur_cb_items);
220
221         GOTO(out, rc);
222
223 out:
224         if (rc != 0)
225                 osp_async_update_item_fini(env, oaui);
226
227         return rc;
228 }
229
230 /**
231  * If the transaction creation goes to OSP, it means the update
232  * in this transaction only includes remote UPDATE. It is only
233  * used by LFSCK right now.
234  **/
235 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
236 {
237         struct thandle *th = NULL;
238         struct thandle_update *tu = NULL;
239         int rc = 0;
240
241         OBD_ALLOC_PTR(th);
242         if (unlikely(th == NULL))
243                 GOTO(out, rc = -ENOMEM);
244
245         th->th_dev = d;
246         th->th_tags = LCT_TX_HANDLE;
247         atomic_set(&th->th_refc, 1);
248         th->th_alloc_size = sizeof(*th);
249
250         OBD_ALLOC_PTR(tu);
251         if (tu == NULL)
252                 GOTO(out, rc = -ENOMEM);
253
254         INIT_LIST_HEAD(&tu->tu_remote_update_list);
255         tu->tu_only_remote_trans = 1;
256         th->th_update = tu;
257
258 out:
259         if (rc != 0) {
260                 if (tu != NULL)
261                         OBD_FREE_PTR(tu);
262                 if (th != NULL)
263                         OBD_FREE_PTR(th);
264                 th = ERR_PTR(rc);
265         }
266
267         return th;
268 }
269
270 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
271                              struct dt_update_request *dt_update,
272                              struct thandle *th, bool flow_control)
273 {
274         struct thandle_update   *tu = th->th_update;
275         int                     rc = 0;
276
277         LASSERT(tu != NULL);
278
279         /* If the transaction only includes remote update, it should
280          * still be asynchronous */
281         if (is_only_remote_trans(th)) {
282                 struct osp_async_update_args    *args;
283                 struct ptlrpc_request           *req;
284
285                 list_del_init(&dt_update->dur_list);
286                 rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
287                                          dt_update->dur_req, &req);
288                 if (rc == 0) {
289                         args = ptlrpc_req_async_args(req);
290                         args->oaua_update = dt_update;
291                         args->oaua_flow_control = flow_control;
292                         req->rq_interpret_reply =
293                                 osp_async_update_interpret;
294                         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
295                 } else {
296                         out_destroy_update_req(dt_update);
297                 }
298         } else {
299                 /* Before we support async update, the cross MDT transaction
300                  * has to been synchronized */
301                 th->th_sync = 1;
302                 rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
303                                      dt_update, NULL);
304         }
305
306         return rc;
307 }
308
309 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
310                     struct thandle *th)
311 {
312         struct thandle_update *tu = th->th_update;
313         struct dt_update_request *dt_update;
314         int rc = 0;
315
316         if (tu == NULL)
317                 return rc;
318
319         /* Check whether there are updates related with this OSP */
320         dt_update = out_find_update(tu, dt);
321         if (dt_update == NULL)
322                 return rc;
323
324         /* Note: some updates needs to send before local transaction,
325          * some needs to send after local transaction.
326          *
327          * If the transaction only includes remote updates, it will
328          * send updates to remote MDT in osp_trans_stop.
329          *
330          * If it is remote create, it will send the remote req after
331          * local transaction. i.e. create the object locally first,
332          * then insert the name entry.
333          *
334          * If it is remote unlink, it will send the remote req before
335          * the local transaction, i.e. delete the name entry remote
336          * first, then destroy the local object. */
337         if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
338                 rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th,
339                                        false);
340
341         return rc;
342 }
343
344 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
345                    struct thandle *th)
346 {
347         struct thandle_update           *tu = th->th_update;
348         struct dt_update_request        *dt_update;
349         int rc = 0;
350
351         LASSERT(tu != NULL);
352         LASSERT(tu != LP_POISON);
353         /* Check whether there are updates related with this OSP */
354         dt_update = out_find_update(tu, dt);
355         if (dt_update == NULL) {
356                 if (!is_only_remote_trans(th))
357                         return rc;
358                 goto put;
359         }
360
361         if (dt_update->dur_req->ourq_count == 0) {
362                 out_destroy_update_req(dt_update);
363                 goto put;
364         }
365
366         if (is_only_remote_trans(th)) {
367                 if (th->th_result == 0) {
368                         struct osp_device *osp = dt2osp_dev(th->th_dev);
369                         struct client_obd *cli = &osp->opd_obd->u.cli;
370
371                         rc = obd_get_request_slot(cli);
372                         if (!osp->opd_imp_active || osp->opd_got_disconnected) {
373                                 if (rc == 0)
374                                         obd_put_request_slot(cli);
375
376                                 rc = -ENOTCONN;
377                         }
378
379                         if (rc != 0) {
380                                 out_destroy_update_req(dt_update);
381                                 goto put;
382                         }
383
384                         rc = osp_trans_trigger(env, dt2osp_dev(dt),
385                                                dt_update, th, true);
386                         if (rc != 0)
387                                 obd_put_request_slot(cli);
388                 } else {
389                         rc = th->th_result;
390                         out_destroy_update_req(dt_update);
391                 }
392         } else {
393                 if (tu->tu_sent_after_local_trans)
394                         rc = osp_trans_trigger(env, dt2osp_dev(dt),
395                                                dt_update, th, false);
396                 rc = dt_update->dur_rc;
397                 out_destroy_update_req(dt_update);
398         }
399
400 put:
401         thandle_put(th);
402         return rc;
403 }