Horizon – Rev 16
?pathlinks?
using Horizon.Snapshots;
using Horizon.Utilities;
using Lucene.Net.Analysis.Standard;
using Lucene.Net.Documents;
using Lucene.Net.Index;
using Lucene.Net.QueryParsers;
using Lucene.Net.Search;
using Lucene.Net.Store;
using Serilog;
using System;
using System.Collections.Generic;
using System.Data;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Tesseract;
namespace Horizon.Searching
{
public class SearchEngine : IDisposable
{
#region Private Fields
private TesseractEngine _tesseractEngine;
private FSDirectory _indexDirectory;
private IndexWriter _indexWriter;
private StandardAnalyzer _standardAnalyzer;
private Configuration.Configuration _configuration;
private CancellationToken _cancellationToken;
private CancellationTokenSource _rebuildIndexCancellationTokenSource;
private CancellationToken _indexServerMessageCancellationToken;
private static HttpClient _httpClient;
private SemaphoreSlim _tesseractSemaphoreSlim;
private ScheduledContinuation _indexerCommitScheduledContinuation;
private BufferBlock<Snapshot> _indexingBufferBlock;
private IDisposable[] _indexServerMessageLinks;
private static readonly Regex _splitWordRegex = new Regex(@"((\b[a-zA-Z0-9]+\b)((?<=\.\w).)?)", RegexOptions.Compiled | RegexOptions.IgnoreCase);
#endregion
#region Constructors, Destructors and Finalizers
private SearchEngine()
{
_indexerCommitScheduledContinuation = new ScheduledContinuation();
_tesseractSemaphoreSlim = new SemaphoreSlim(1, 1);
if (!System.IO.Directory.Exists(global::Horizon.Constants.UserApplicationDirectory))
{
System.IO.Directory.CreateDirectory(global::Horizon.Constants.UserApplicationDirectory);
}
_tesseractEngine = new TesseractEngine(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "tessdata"), "eng", EngineMode.Default);
if (!System.IO.Directory.Exists(global::Horizon.Constants.SearchIndexDirectory))
{
System.IO.Directory.CreateDirectory(global::Horizon.Constants.SearchIndexDirectory);
}
_indexDirectory = FSDirectory.Open(global::Horizon.Constants.SearchIndexDirectory);
bool create = false;
try
{
// check if the index contains at least some documents
var indexReader = IndexReader.Open(_indexDirectory, true);
if (indexReader.NumDocs() <= 0)
{
throw new ArgumentException();
}
// check that all the fields used by this implementation are to be found in the index
var indexFields = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var indexFieldNames = indexReader.GetFieldNames(IndexReader.FieldOption.ALL);
var usedFields = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
foreach (var fieldInfo in typeof(Constants.FIELDS).GetFields())
{
var field = $"{fieldInfo.GetValue(null)}";
usedFields.Add(field);
}
if (indexFields.Intersect(usedFields).Count() != indexFields.Count)
{
throw new ArgumentException();
}
}
catch
{
create = true;
}
_standardAnalyzer = new StandardAnalyzer(Lucene.Net.Util.Version.LUCENE_30);
_indexWriter = new IndexWriter(_indexDirectory, _standardAnalyzer, create, IndexWriter.MaxFieldLength.UNLIMITED);
// set up server message indexing pipeline
var serverIndexProcessMessageBufferBlock = new BufferBlock<Snapshot>(new DataflowBlockOptions { EnsureOrdered = false, CancellationToken = _indexServerMessageCancellationToken });
var serverIndexProcessMessageActionBlock = new ActionBlock<Snapshot>(snapshot =>
{
try
{
Log.Debug($"Indexing snapshot {snapshot.Hash}");
var document = new Document();
var note = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
if (!string.IsNullOrEmpty(snapshot.Note))
{
foreach (var match in _splitWordRegex.Matches(snapshot.Path).OfType<Match>())
{
if (!match.Success)
{
continue;
}
note.Add($"{match}".ToLower(CultureInfo.CurrentCulture));
}
}
document.Add(new Field(Constants.FIELDS.NOTE, string.Join(" ", note), Field.Store.YES, Field.Index.ANALYZED));
var name = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
if (!string.IsNullOrEmpty(snapshot.Name))
{
foreach (var match in _splitWordRegex.Matches(snapshot.Path).OfType<Match>())
{
if (!match.Success)
{
continue;
}
name.Add($"{match}".ToLower(CultureInfo.CurrentCulture));
}
}
document.Add(new Field(Constants.FIELDS.NAME, string.Join(" ", name), Field.Store.YES, Field.Index.ANALYZED));
var path = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
if (!string.IsNullOrEmpty(snapshot.Path))
{
foreach (var match in _splitWordRegex.Matches(snapshot.Path).OfType<Match>())
{
if(!match.Success)
{
continue;
}
path.Add($"{match}".ToLower(CultureInfo.CurrentCulture));
}
}
document.Add(new Field(Constants.FIELDS.PATH, string.Join(" ", path), Field.Store.YES, Field.Index.ANALYZED));
var hash = snapshot.Hash.ToLower(CultureInfo.CurrentCulture);
document.Add(new Field(Constants.FIELDS.HASH, hash, Field.Store.YES, Field.Index.NOT_ANALYZED));
_indexWriter.AddDocument(document);
_indexerCommitScheduledContinuation.Schedule(TimeSpan.FromSeconds(1), CommitOptimize, _cancellationToken);
}
catch (Exception exception)
{
Log.Error(exception, "Exception thrown while indexing message");
}
}, new ExecutionDataflowBlockOptions { CancellationToken = _indexServerMessageCancellationToken, EnsureOrdered = false });
_indexingBufferBlock = new BufferBlock<Snapshot>(new DataflowBlockOptions { EnsureOrdered = false, CancellationToken = _indexServerMessageCancellationToken });
_indexServerMessageLinks = new[] {
_indexingBufferBlock.LinkTo(serverIndexProcessMessageActionBlock, new DataflowLinkOptions { PropagateCompletion = true })
};
}
public SearchEngine(Configuration.Configuration configuration, CancellationToken cancellationToken) : this()
{
_configuration = configuration;
_cancellationToken = cancellationToken;
}
public void Dispose()
{
_rebuildIndexCancellationTokenSource.Cancel();
foreach (var disposable in _indexServerMessageLinks)
{
disposable.Dispose();
}
if (_indexWriter != null)
{
_indexWriter.Dispose();
_indexWriter = null;
}
if (_standardAnalyzer != null)
{
_standardAnalyzer.Dispose();
_standardAnalyzer = null;
}
if (_indexDirectory != null)
{
_indexDirectory.Dispose();
_indexDirectory = null;
}
if (_httpClient != null)
{
_httpClient.Dispose();
_httpClient = null;
}
}
#endregion
#region Private Methods
private void CommitOptimize()
{
_indexWriter.Commit();
_indexWriter.Optimize();
}
#endregion
#region Public Methods
/// <summary>
/// trash all the indexed documents
/// </summary>
public void TrashIndex()
{
_indexWriter.DeleteAll();
_indexWriter.Commit();
}
/// <summary>
///
/// </summary>
/// <param name="gotifyConnection"></param>
/// <param name="gotifyMessageIncoming"></param>
/// <remarks>searching and indexing is culture-specific and case-insensitive</remarks>
/// <returns></returns>
public async Task Index(Snapshot snapshot, CancellationToken cancellationToken)
{
await _indexingBufferBlock.SendAsync(snapshot, cancellationToken);
}
public bool IsIndexed(Snapshot snapshot)
{
using var indexReader = _indexWriter.GetReader();
using var indexSearcher = new IndexSearcher(indexReader);
var booleanQuery = new BooleanQuery(true);
var q1 = new TermQuery(new Term(Constants.FIELDS.HASH, snapshot.Hash));
booleanQuery.Add(q1, Occur.MUST);
var topDocs = indexSearcher.Search(booleanQuery, 1);
return topDocs.TotalHits != 0;
}
/// <summary>
///
/// </summary>
/// <param name="text"></param>
/// <param name="count"></param>
/// <param name="cancellationToken"></param>
/// <remarks>searching and indexing is culture-specific and case-insensitive</remarks>
/// <returns></returns>
public IEnumerable<string> Search(string text, int count)
{
using var indexReader = _indexWriter.GetReader();
var indexSearcher = new IndexSearcher(indexReader);
var booleanQuery = new BooleanQuery();
foreach (var match in _splitWordRegex.Matches(text))
{
var search = $"{match}";
if (string.IsNullOrEmpty(search))
{
continue;
}
var searchText = search.ToLower(CultureInfo.CurrentCulture);
var noteQueryParser = new QueryParser(Lucene.Net.Util.Version.LUCENE_30, Constants.FIELDS.NOTE, _standardAnalyzer);
var noteQuery = noteQueryParser.Parse(searchText);
booleanQuery.Add(noteQuery, Occur.SHOULD);
var nameQueryParser = new QueryParser(Lucene.Net.Util.Version.LUCENE_30, Constants.FIELDS.NAME, _standardAnalyzer);
var nameQuery = nameQueryParser.Parse(searchText);
booleanQuery.Add(noteQuery, Occur.SHOULD);
var pathQueryParser = new QueryParser(Lucene.Net.Util.Version.LUCENE_30, Constants.FIELDS.PATH, _standardAnalyzer);
var pathQuery = pathQueryParser.Parse(searchText);
booleanQuery.Add(pathQuery, Occur.SHOULD);
}
var topDocs = indexSearcher.Search(booleanQuery, count);
foreach (var doc in topDocs.ScoreDocs.Select(top => top.Doc))
{
var document = indexSearcher.Doc(doc);
if (document == null)
{
Log.Warning("Search return empty document");
continue;
}
var hashText = document.Get(Constants.FIELDS.HASH);
if (string.IsNullOrEmpty(hashText))
{
Log.Warning("Failed to retrieve snapshot hash");
continue;
}
yield return hashText;
}
}
#endregion
}
}