質問者
Register-ObjectEventで登録した非同期プロセスオブジェクトの標準出力書き込みイベントが実際の書き込み順から逸脱してランダムな順序で起きます。

質問
-
非同期でプロセスを実行する[System.Diagnostics.Process]クラスを使用して、起動したプロセスの標準出力に書き込まれた文字列をRegister-ObjectEventコマンドレットでOutputDataReceivedイベントに関連付けしたスクリプトブロックで読み取って、プロセスの標準入力への書き込みと同じタイミングで出力したいと思っています。このため、以下のテスト用の関数を用意しました。
function Invoke-Process { [CmdletBinding()] [OutputType( [string[]] )] Param( #実行ファイルのパス。 [Parameter( Mandatory = $true )] [ValidateNotNullOrEmpty()] [string] $FilePath, #コマンドライン引数。 [AllowEmptyString()] [string] $Arguments = '', #プロセスの標準入力に渡す文字列。 [Parameter( ValueFromPipeline = $true )] [AllowNull()] [AllowEmptyString()] [AllowEmptyCollection()] [string[]] $StdIn = ( New-Object -TypeName ( [string[]].FullName ) -ArgumentList ( 0 ) ) ) begin { #プロセス実行パラメーターを取得する。 function local:getProcessStartInfo { [OutputType( [System.Diagnostics.ProcessStartInfo] )] Param( [string] $filePath, [string] $arguments ) $local:result = New-Object -TypeName ( [System.Diagnostics.ProcessStartInfo].FullName ) $result.FileName = $filePath $result.Arguments = $arguments $result.CreateNoWindow = $true $result.UseShellExecute = $false $result.RedirectStandardInput = $true $result.RedirectStandardOutput = $true return $result } #FIFOバッファの要素を取り出す。 function local:takeFromFifoBuffer { [OutputType( [string] )] Param( [AllowNull()] [AllowEmptyString()] [AllowEmptyCollection()] [System.Collections.Generic.List[string]] $fifoBuffer ) $local:result = '' if ( 0 -ne $fifoBuffer.Count ) { $result = $fifoBuffer[ 0 ] [void] ( $fifoBuffer.RemoveAt( 0 ) ) } else { [void] ( Write-Error -Message 'FIFOバッファに要素がありません。' ) } return $result } #標準出力の内容を出力する。 function local:outputStdOut { [OutputType( [string[]] )] Param( [AllowNull()] [AllowEmptyString()] [AllowEmptyCollection()] [System.Collections.Generic.List[string]] $stdOutFifoBuffer ) while ( 0 -ne $stdOutFifoBuffer.Count ) { takeFromFifoBuffer -fifoBuffer $stdOutFifoBuffer } } $local:processStartInfo = getProcessStartInfo -filePath $FilePath -arguments $Arguments #プロセス実行パラメーター。 $local:stdOutFifoBuffer = New-Object -TypeName ( [System.Collections.Generic.List[string]].FullName ) #標準出力の内容を格納するFIFOバッファ。 $local:stdOutEventName = 'Invoke-Process.OutputStdOut' #標準出力への書き込みイベントの名前。 #標準出力への書き込みイベントに関連付けられたアクション。 $local:outputEventAction = { [void] ( $Event.MessageData.Add( $EventArgs.Data ) ) } #プロセスオブジェクト。書き込みイベントが検出される。 $local:process = New-Object -TypeName ( [System.Diagnostics.Process].FullName ) $process.StartInfo = $processStartInfo [void] ( Register-ObjectEvent -InputObject $process -EventName 'OutputDataReceived' -SourceIdentifier $stdOutEventName -Action $outputEventAction -MessageData $stdOutFifoBuffer ) #プロセスを実行する。 [void] ( $process.Start() ) [void] ( $process.BeginOutputReadLine() ) } process { #標準入力に渡した1行に対応する標準出力のデータを出力する。 foreach ( $local:stdInLine in $StdIn ) { [void] ( $process.StandardInput.WriteLine( $stdInLine ) ) outputStdOut -stdOutFifoBuffer $stdOutFifoBuffer } } end { try { #標準入力を閉じプロセスの終了を待つ。 [void] ( $process.StandardInput.Close() ) [void] ( $process.WaitForExit() ) } catch { if ( -not $process.HasExited ) { [void] ( $process.Kill() ) } } finally { #FIFOバッファに残っているデータを全て出力する。 outputStdOut -stdOutFifoBuffer $stdOutFifoBuffer #終了手続き。 [void] ( Unregister-Event -SourceIdentifier $stdOutEventName ) [void] ( $process.Close() ) } } }
この関数は、起動したプロセスの標準入力に渡したい文字列をパイプラインで入力できるようになっていて、パイプラインで入力された1行を標準入力に書き込むたびにそれまでに標準出力から出力された文字列が蓄積されたFIFOバッファ([System.Collections.Generic.List[string]]コレクション)の中身をこの関数の出力として放出します。[System.Collections.Generic.List[string]]コレクションをFIFOバッファとして使うために関数内関数takeFromFifoBufferでは、コレクションの最初の要素を文字列として取り出した後はその要素を削除するようにしています。こうすることで、コンソールアプリの標準入力にGB単位の巨大なテキストを入力してまた標準出力から巨大なテキストを出力させても必要最小限のメモリ消費で済みます。
ところがこれを実際に動かしてみると、出力された各行が本来の順序ではなくランダムに入れ替わったものになってしまいました。例えば、以下のようにしてコマンドプロンプトのdirコマンドを実行させた場合です。
[string[]] ( 'dir /a /o:en', 'exit' ) | Invoke-Process -FilePath 'cmd.exe'
本来この出力は、以下のようにフォルダー名順にソートされたものとなるはずです。
C:\Users\********\Desktop\t\folders>dir /a /o:en ドライブ C のボリューム ラベルは Windows です ボリューム シリアル番号は A010-80C2 です C:\Users\********\Desktop\t\folders のディレクトリ 2018/11/08 17:16 <DIR> . 2018/11/08 17:16 <DIR> .. 2018/11/08 17:16 <DIR> 0 2018/11/08 17:16 <DIR> 1 2018/11/08 17:16 <DIR> 2 2018/11/08 17:16 <DIR> 3 2018/11/08 17:16 <DIR> 4 2018/11/08 17:16 <DIR> 5 2018/11/08 17:16 <DIR> 6 2018/11/08 17:16 <DIR> 7 2018/11/08 17:16 <DIR> 8 2018/11/08 17:16 <DIR> 9 2018/11/08 17:15 <DIR> a 2018/11/08 17:15 <DIR> b 2018/11/08 17:15 <DIR> c 2018/11/08 17:15 <DIR> d 2018/11/08 17:16 <DIR> e 2018/11/08 17:16 <DIR> f 0 個のファイル 0 バイト 18 個のディレクトリ 446,996,131,840 バイトの空き領域
ところが実際には以下のように各行がランダムに入れ替わってしまっています。
Microsoft Windows [Version 10.0.16299.64] (c) 2017 Microsoft Corporation. All rights reserved. C:\Users\********\Desktop\t\folders>dir /a /o:en ドライブ C のボリューム ラベルは Windows です 2018/11/08 17:16 <DIR> . 2018/11/08 17:16 <DIR> .. C:\Users\********\Desktop\t\folders のディレクトリ ボリューム シリアル番号は A010-80C2 です 2018/11/08 17:16 <DIR> 1 2018/11/08 17:16 <DIR> 2 2018/11/08 17:16 <DIR> 4 2018/11/08 17:16 <DIR> 7 2018/11/08 17:15 <DIR> a 2018/11/08 17:15 <DIR> c 2018/11/08 17:16 <DIR> 9 2018/11/08 17:15 <DIR> d 2018/11/08 17:16 <DIR> e 2018/11/08 17:15 <DIR> b 2018/11/08 17:16 <DIR> 8 0 個のファイル 0 バイト 2018/11/08 17:16 <DIR> f 2018/11/08 17:16 <DIR> 3 18 個のディレクトリ 446,975,279,104 バイトの空き領域 2018/11/08 17:16 <DIR> 6 2018/11/08 17:16 <DIR> 0 C:\Users\********\Desktop\t\folders>exit 2018/11/08 17:16 <DIR> 5
これは、[System.Diagnostics.Process]オブジェクトのOutputDataReceivedイベントに関連付けしたスクリプトブロック
[void] ( $Event.MessageData.Add( $EventArgs.Data ) )
が、実際の標準出力への書き込みのタイミングよりも遅延して、しかもバラバラのタイミングで実行されてしまっていることを意味します。この現象からは、イベントの発生そのものが時間的にズレたのかそれともスクリプトブロックの実行がイベントの発生から遅れてしまったのか見当がつきません。
当方では、この関数に標準エラー出力や終了コードの採取、実行時のカレントフォルダーやユーザーの指定などの機能を付け加えて、最終的にStart-Processコマンドレットや&演算子よりもより使いやすい関数の作成を目指しています。しかし、この症状のため先行きが見通せず大変困っています。
どなたか解決策のご教示よろしくお願いします。
- 編集済み fzok4234 2018年11月8日 11:46 関数コードの不適切な個所を削除
すべての返信
-
当方環境(PSVersion:5.1.17134.228)で実行してみたところ、出力がランダムで入れ替わるという現象は発生しませんでした。
ただ、Register-ObjectEventコマンドレットの-Actionパラメータで指定したスクリプトブロックは、非同期に呼び出され、並列実行されているようです。
このことは-Actionで指定するスクリプトブロック内で適宜Sleepすると、出力行数×Sleep時間以内に処理が完了することから判断できます。
となるとおそらくスクリプトブロックはマルチスレッドで実行されており、実行順が保証されるものではないと考えられます。アプローチを変える必要があるかもしれません。
-
結局、プロセスオブジェクトのStandardOutputまたはStandardErrorプロパティのストリームリーダーの非同期読み取りメソッドであるReadLineAsync()を使って[System.Threading.Tasks.Task[string]]オブジェクトを取得して、これを操作するという方法に落ち着きました。以下のように、このタスクオブジェクトで読み取りがまだ完了していないときは一旦これを変数に保存して保留状態として、後で再度読み取りを試みるというやり方です。今のところこのコードで問題なさそうです。
function script:Invoke-Process { [CmdletBinding( PositionalBinding = $true )] [OutputType( [string[]] )] Param( #実行するファイルのパス。 [Parameter( Mandatory = $true, Position = 0, HelpMessage = '実行するファイルのパスを入力してください。' )] [ValidateNotNullOrEmpty()] [string] $FilePath, #コマンドライン引数。 [Parameter( Position = 1 )] [AllowNull()] [AllowEmptyString()] [AllowEmptyCollection()] [string[]] $ArgumentList = ( [string[]]::new( 0 ) ), #実行するファイルに関連付けられたVerb。 [AllowEmptyString()] [string] $Verb = '', #カレントフォルダー。 [ValidateNotNullOrEmpty()] [string] $WorkingDirectory = ( ( Get-Location -PSProvider 'FileSystem' ).ProviderPath ), #標準入力へ書き込む文字列。 [Parameter( ValueFromPipeline = $true )] [AllowNull()] [AllowEmptyString()] [AllowEmptyCollection()] [string[]] $StdIn = ( [string[]]::new( 0 ) ), #標準エラー出力の出力先配列。 [AllowNull()] [AllowEmptyString()] [AllowEmptyCollection()] [System.Collections.Generic.List[string]] $StdErr = $null, #終了コードの出力先整数( [int] )への参照。 [ref] $ExitCode ) begin { #標準入出力の各ストリームタイプ。 enum stdIOStreamType { StdIn = 0 StdOut = 1 StdErr = 2 } #標準入出力の情報クラス。 class stdIOInfo { #タイプ。 [stdIOStreamType] $StreamType #標準出力または標準エラー出力のストリームリーダー。 [System.IO.StreamReader] $StreamReader #読み取り保留中の標準出力および標準エラー出力の読み取りタスク。 [System.Threading.Tasks.Task[string]] $PendingTask #標準出力および標準エラー出力の読み取りが完了しているかどうかのフラグ。 [bool] $IsCompletedReading #出力先文字配列の参照。 [System.Collections.Generic.List[string]] $OutputList #コンストラクター。 stdIOInfo( [stdIOStreamType] $initialStreamType, [System.IO.StreamReader] $initialStreamReader, [System.Collections.Generic.List[string]] $initialOutputList ) { $this.StreamType = $initialStreamType $this.StreamReader = $initialStreamReader $this.PendingTask = $null $this.IsCompletedReading = $false $this.OutputList = $initialOutputList } } #プロセス実行パラメーターを取得する。 function local:getProcessStartInfo { [OutputType( [System.Diagnostics.ProcessStartInfo] )] Param( #実行ファイルのパス。 [string] $filePath, #引数の文字列。 [AllowEmptyString()] [string] $arguments, #Verb。 [AllowEmptyString()] [string] $verb, #カレントフォルダーのパス。 [string] $workingDirectory ) #戻り値のオブジェクト。 $local:result = [System.Diagnostics.ProcessStartInfo]::new() #プロパティを設定する。 $result.FileName = $filePath $result.Arguments = $arguments $result.Verb = $verb $result.WorkingDirectory = $workingDirectory $result.CreateNoWindow = $true $result.UseShellExecute = $false $result.RedirectStandardInput = $true $result.RedirectStandardOutput = $true $result.RedirectStandardError = $true return $result } #カレントフォルダーのパスの存在チェックと絶対パスへの変換を行う。 function local:convertWorkingDirectory { [OutputType( [string] )] Param( #変換を行うカレントフォルダーのパス。 [string] $workingDirectory ) #戻り値。 $local:result = '' #パスの存在チェックを行い絶対パスに変換する。 if ( Test-Path -LiteralPath $workingDirectory -PathType Container ) { $result = Convert-Path -LiteralPath $workingDirectory } else { [void] ( Write-Error -Category InvalidArgument -Message ( 'カレントフォルダー "' + $workingDirectory + '" が存在しません。' ) ) } return $result } #読み取った標準出力または標準エラー出力の1行を出力する。 function local:outputStdIOLine { [OutputType( [string], [void] )] Param( #出力する1行の文字列。 [AllowEmptyString()] [string] $line, #標準入出力のタイプ。 [stdIOStreamType] $stdIOType, #標準エラー出力の出力先への参照。 [AllowNull()] [AllowEmptyString()] [AllowEmptyCollection()] [System.Collections.Generic.List[string]] $outputList ) #標準入出力の種類を判別する。 switch ( $stdIOType ) { #標準出力。 ( [stdIOStreamType]::StdOut ) { #文字列をそのまま出力する。 $line break } #標準エラー出力。 ( [stdIOStreamType]::StdErr ) { #参照の指定の有無を確認し参照の指定があるときのみ参照先へ出力する。 if ( $null -ne $outputList ) { [void] ( $outputList.Add( $line ) ) } break } #それ以外はエラーとする。 default { [void] ( Write-Error -Category InvalidArgument -Message '標準入力は指定できません。' ) break } } } #プロセスの標準入力または標準エラー入力をできるだけ読み取って出力する。 function local:outputStdIO { [OutputType( [string[]], [void] )] Param( #標準出力または標準エラー出力の情報オブジェクト。 [stdIOInfo] $info ) #読み取った文字列から全て読み取りが完了したかどうかを判別して出力する。 function local:outputOrCompleteReading { [OutputType( [string], [void] )] Param( #読み取った文字列。読み取りが完了すればNullとなる。 [AllowNull()] [AllowEmptyString()] [System.Object] $line, #標準出力または標準エラー出力の情報オブジェクト。 [stdIOInfo] $info ) #読み取った文字列がNullかどうか確かめる。 if ( $null -eq $line ) { #Nullすなわち読み取りが全て完了したとき。 #読み取り完了フラグを有効にして何も出力しない。 $info.IsCompletedReading = $true } else { #まだ読み取りが全て完了していないとき。 #文字列を出力する。 outputStdIOLine -line $line -stdIOType $info.StreamType -outputList $info.OutputList } } #まだ読み取りが完了していないときのみ実行する。 if ( -not $info.IsCompletedReading ) { #保留中タスクが存在する場合は先にこれを片付ける。 if ( $null -ne $info.PendingTask ) { #タスクの完了を確認する。 if ( $info.PendingTask.IsCompleted ) { #タスクが完了したとき。 #読み取った文字列を出力しタスクを閉じる。 outputOrCompleteReading -line $info.PendingTask.Result -info $info [void] ( $info.PendingTask.Dispose() ) $info.PendingTask = $null #読み取りが全て終了していればこれ以上の処理はしない。 if ( $info.IsCompletedReading ) { return } #読み取りが全て終了していなければ次の処理へ進む。 } else { #まだタスクが完了していないとき。 #次の機会に持ち越すため一旦終了する。 return } } #新しい行を次々と読み込み出力する。 while ( $true ) { #新たな1行の読み取りタスク。 [System.Threading.Tasks.Task[string]] $local:readingTask = $info.StreamReader.ReadLineAsync() #タスクの完了を確認する。 if ( $readingTask.IsCompleted ) { #タスクが完了したとき。 #読み取った文字列を出力しタスクを閉じる。 outputOrCompleteReading -line $readingTask.Result -info $info [void] ( $readingTask.Dispose() ) #読み取りが全て終了していればこれ以上の処理はしない。 if ( $info.IsCompletedReading ) { return } #読み取りが全て終了していなければ次の行へ進む。 } else { #まだタスクが完了していないとき。 #タスクを一旦保留して終了する。 $info.PendingTask = $readingTask return } } } } #引数文字列。 $local:arguments = $ArgumentList -join ' ' #カレントフォルダー。 $local:convertedWorkingDirectory = convertWorkingDirectory -workingDirectory $WorkingDirectory #プロセス実行パラメーター。 $local:processStartInfo = getProcessStartInfo -filePath $FilePath -arguments $arguments -verb $Verb -workingDirectory $convertedWorkingDirectory #プロセスオブジェクト。 $local:process = [System.Diagnostics.Process]::new() $process.StartInfo = $processStartInfo #プロセスを実行する。 [void] ( $process.Start() ) #標準エラー出力を保存する配列を全消去する。 if ( $null -ne $StdErr ) { [void] ( $StdErr.Clear() ) } #標準出力と標準エラー出力の情報オブジェクト。 $local:stdOutInfo = [stdIOInfo]::new( ( [stdIOStreamType]::StdOut ), $process.StandardOutput, $null ) $local:stdErrInfo = [stdIOInfo]::new( ( [stdIOStreamType]::StdErr ), $process.StandardError, $StdErr ) } process { #標準入力に渡す文字列を取得する。 foreach ( $local:stdInLine in $StdIn ) { #各行を標準入力に渡す。 [void] ( $process.StandardInput.WriteLine( $stdInLine ) ) #標準入力に渡した1行に対応する標準出力および標準エラー出力のデータを出力する。 outputStdIO -info $stdOutInfo [void] ( outputStdIO -info $stdErrInfo ) } } end { #プロセス終了待ち中に強制終了することを想定したブロック。 try { #標準入力を閉じる。 [void] ( $process.StandardInput.Close() ) #読み取りが完了するまで標準出力および標準エラー出力のデータ出力を続ける。 while ( -not ( $stdOutInfo.IsCompletedReading -and $stdErrInfo.IsCompletedReading ) ) { outputStdIO -info $stdOutInfo [void] ( outputStdIO -info $stdErrInfo ) } #読み取り完了後もプロセスの終了を待つ。 [void] ( $process.WaitForExit() ) } catch { #プロセスがまだ起動中ならば強制終了させる。 if ( -not $process.HasExited ) { [void] ( $process.Kill() ) } } finally { #終了コードの引数の存在を確認してから終了コードを出力する。 if ( $MyInvocation.BoundParameters.ContainsKey( 'ExitCode' ) ) { $ExitCode.Value = $process.ExitCode } #終了手続き。 [void] ( $stdOutInfo.StreamReader.Close() ) [void] ( $stdErrInfo.StreamReader.Close() ) [void] ( $process.Close() ) } } }