Note that there are some explanatory texts on larger screens.

plurals
  1. POMultithreaded file upload synchronization
    primarykey
    data
    text
    <p>Currently I am working on a Delphi XE3 client/server application to transfer files (with the Indy FTP components). The client part monitors a folder, gets a list of the files inside, uploads them to the server and deletes the originals. The uploading is done by a separate thread, which processes files one by one. The files can range from 0 to a few thousand and their sizes also vary a lot.</p> <p>It is a Firemonkey app compiled for both OSX and Windows, so I had to use TThread instead of OmniThreadLibrary, which I preferred. My customer reports that the application randomly freezes. I could not duplicate it, but since I don't have so much experience with TThread, I might have put deadlock condition somewhere. I read quite a lot of examples, but I'm still not sure about some of the multithread specifics.</p> <p>The app structure is simple:<br> A timer in the main thread checks the folder and gets information about each file into a record, which goes into a generic TList. This list keeps information about the names of the files, size, the progress, whether the file is completely uploaded or has to be retried. All that is displayed in a grid with progress bars, etc. This list is accessed only by the main thread. After that the items from the list are sent to the thread by calling the AddFile method (code below). The thread stores all files in a thread-safe queue like this one <a href="http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/">http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/</a><br> When the file is uploaded the uploader thread notifies the main thread with a call to Synchronize.<br> The main thread periodically calls the Uploader.GetProgress method to check the current file progress and display it. This function is not actually thread-safe, but could it cause a deadlock, or only wrong data returned?</p> <p>What would be a safe and efficient way to do the progress check? </p> <p>So is this approach OK or I have missed something? How would you do this?<br> For example I though of making a new thread just to read the folder contents. This means that the TList I use has to be made thread-safe, but it has to be accessed all the time to refresh the displayed info in the GUI grid. Wouldn't all the synchronization just slow down the GUI?</p> <p>I have posted the simplified code below in case someone wants to look at it. If not, I would be happy to hear some opinions on what I should use in general. The main goals are to work on both OSX and Windows; to be able to display information about all the files and the progress of the current one; and to be responsive regardless of the number and size of the files.</p> <p>That's the code of the uploader thread. I have removed some of it for easier reading:</p> <pre><code>type TFileStatus = (fsToBeQueued, fsUploaded, fsQueued); TFileInfo = record ID: Integer; Path: String; Size: Int64; UploadedSize: Int64; Status: TFileStatus; end; TUploader = class(TThread) private FTP: TIdFTP; fQueue: TThreadedQueue&lt;TFileInfo&gt;; fCurrentFile: TFileInfo; FUploading: Boolean; procedure ConnectFTP; function UploadFile(aFileInfo: TFileInfo): String; procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); procedure SignalComplete; procedure SignalError(aError: String); protected procedure Execute; override; public property Uploading: Boolean read FUploading; constructor Create; destructor Destroy; override; procedure Terminate; procedure AddFile(const aFileInfo: TFileInfo); function GetProgress: TFileInfo; end; procedure TUploader.AddFile(const aFileInfo: TFileInfo); begin fQueue.Enqueue(aFileInfo); end; procedure TUploader.ConnectFTP; begin ... FTP.Connect; end; constructor TUploader.Create; begin inherited Create(false); FreeOnTerminate := false; fQueue := TThreadedQueue&lt;TFileInfo&gt;.Create; // Create the TIdFTP and set ports and other params ... end; destructor TUploader.Destroy; begin fQueue.Close; fQueue.Free; FTP.Free; inherited; end; // Process the whole queue and inform the main thread of the progress procedure TUploader.Execute; var Temp: TFileInfo; begin try ConnectFTP; except on E: Exception do SignalError(E.Message); end; // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails while fQueue.Peek(fCurrentFile) = wrSignaled do try if UploadFile(fCurrentFile) = '' then begin fQueue.Dequeue(Temp); // Delete the item from the queue if succesful SignalComplete; end; except on E: Exception do SignalError(E.Message); end; end; // Return the current file's info to the main thread. Used to update the progress indicators function TUploader.GetProgress: TFileInfo; begin Result := fCurrentFile; end; // Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); begin fCurrentFile.UploadedSize := AWorkCount; end; procedure TUploader.SignalComplete; begin Synchronize( procedure begin frmClientMain.OnCompleteFile(fCurrentFile); end); end; procedure TUploader.SignalError(aError: String); begin try FTP.Disconnect; except end; if fQueue.Closed then Exit; Synchronize( procedure begin frmClientMain.OnUploadError(aError); end); end; // Clear the queue and terminate the thread procedure TUploader.Terminate; begin fQueue.Close; inherited; end; function TUploader.UploadFile(aFileInfo: TFileInfo): String; begin Result := 'Error'; try if not FTP.Connected then ConnectFTP; FUploading := true; FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path)); Result := ''; finally FUploading := false; end; end; </code></pre> <p>And parts of the main thread that interact with the uploader:</p> <pre><code>...... // Main form fUniqueID: Integer; // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted) fUploader: TUploader; // The uploader thread fFiles: TList&lt;TFileInfo&gt;; fCurrentFileName: String; // Used to display the progress function IndexOfFile(aID: Integer): Integer; //Return the index of the record inside the fFiles given the file ID public procedure OnCompleteFile(aFileInfo: TFileInfo); procedure OnUploadError(aError: String); end; // This is called by the uploader with Synchronize procedure TfrmClientMain.OnUploadError(aError: String); begin // show and log the error end; // This is called by the uploader with Synchronize procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo); var I: Integer; begin I := IndexOfFile(aFileInfo.ID); if (I &gt;= 0) and (I &lt; fFiles.Count) then begin aFileInfo.Status := fsUploaded; aFileInfo.UploadedSize := aFileInfo.Size; FFiles.Items[I] := aFileInfo; Inc(FFilesUploaded); TFile.Delete(aFileInfo.Path); colProgressImg.UpdateCell(I); end; end; procedure TfrmClientMain.ProcessFolder; var NewFiles: TStringDynArray; I, J: Integer; FileInfo: TFileInfo; begin // Remove completed files from the list if it contains more than XX files while FFiles.Count &gt; 1000 do if FFiles[0].Status = fsUploaded then begin Dec(FFilesUploaded); FFiles.Delete(0); end else Break; NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories); for I := 0 to Length(NewFiles) - 1 do begin FileInfo.ID := FUniqueID; Inc(FUniqueID); FileInfo.Path := NewFiles[I]; FileInfo.Size := GetFileSizeByName(NewFiles[I]); FileInfo.UploadedSize := 0; FileInfo.Status := fsToBeQueued; FFiles.Add(FileInfo); if (I mod 100) = 0 then begin UpdateStatusLabel; grFiles.RowCount := FFiles.Count; Application.ProcessMessages; if fUploader = nil then break; end; end; // Send the new files and resend failed to the uploader thread for I := 0 to FFiles.Count - 1 do if (FFiles[I].Status = fsToBeQueued) then begin if fUploader = nil then Break; FileInfo := FFiles[I]; FileInfo.Status := fsQueued; FFiles[I] := FileInfo; SaveDebug(1, 'Add: ' + ExtractFileName(FFiles[I].Path)); FUploader.AddFile(FFiles[I]); end; end; procedure TfrmClientMain.tmrGUITimer(Sender: TObject); var FileInfo: TFileInfo; I: Integer; begin if (fUploader = nil) or not fUploader.Uploading then Exit; FileInfo := fUploader.GetProgress; I := IndexOfFile(FileInfo.ID); if (I &gt;= 0) and (I &lt; fFiles.Count) then begin fFiles.Items[I] := FileInfo; fCurrentFileName := ExtractFileName(FileInfo.Path); colProgressImg.UpdateCell(I); end; end; function TfrmClientMain.IndexOfFile(aID: Integer): Integer; var I: Integer; begin Result := -1; for I := 0 to FFiles.Count - 1 do if FFiles[I].ID = aID then Exit(I); end; </code></pre>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload