mirror of
				https://github.com/ansible-collections/community.general.git
				synced 2025-10-26 13:56:09 -07:00 
			
		
		
		
	* Added from_csv filter and integration tests * Cleaning up whitespace * Adding changelog fragment * Updated changelog fragment name * Removed temp fragment * Refactoring csv functions Part 1 * Syncing refactored csv modules/filters * Adding unit tests for csv Module_Util * Updating changelog fragment * Correcting whitespace in unit test * Improving changelog fragment Co-authored-by: Felix Fontein <felix@fontein.de> * Update changelogs/fragments/2037-add-from-csv-filter.yml Co-authored-by: Felix Fontein <felix@fontein.de>
		
			
				
	
	
		
			67 lines
		
	
	
	
		
			1.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			67 lines
		
	
	
	
		
			1.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| # Copyright: (c) 2021, Andrew Pantuso (@ajpantuso) <ajpantuso@gmail.com>
 | |
| # Copyright: (c) 2018, Dag Wieers (@dagwieers) <dag@wieers.com>
 | |
| # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
 | |
| 
 | |
| from __future__ import absolute_import, division, print_function
 | |
| __metaclass__ = type
 | |
| 
 | |
| import csv
 | |
| from io import BytesIO, StringIO
 | |
| 
 | |
| from ansible.module_utils._text import to_native
 | |
| from ansible.module_utils.six import PY3
 | |
| 
 | |
| 
 | |
| class CustomDialectFailureError(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class DialectNotAvailableError(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| CSVError = csv.Error
 | |
| 
 | |
| 
 | |
| def initialize_dialect(dialect, **kwargs):
 | |
|     # Add Unix dialect from Python 3
 | |
|     class unix_dialect(csv.Dialect):
 | |
|         """Describe the usual properties of Unix-generated CSV files."""
 | |
|         delimiter = ','
 | |
|         quotechar = '"'
 | |
|         doublequote = True
 | |
|         skipinitialspace = False
 | |
|         lineterminator = '\n'
 | |
|         quoting = csv.QUOTE_ALL
 | |
| 
 | |
|     csv.register_dialect("unix", unix_dialect)
 | |
| 
 | |
|     if dialect not in csv.list_dialects():
 | |
|         raise DialectNotAvailableError("Dialect '%s' is not supported by your version of python." % dialect)
 | |
| 
 | |
|     # Create a dictionary from only set options
 | |
|     dialect_params = dict((k, v) for k, v in kwargs.items() if v is not None)
 | |
|     if dialect_params:
 | |
|         try:
 | |
|             csv.register_dialect('custom', dialect, **dialect_params)
 | |
|         except TypeError as e:
 | |
|             raise CustomDialectFailureError("Unable to create custom dialect: %s" % to_native(e))
 | |
|         dialect = 'custom'
 | |
| 
 | |
|     return dialect
 | |
| 
 | |
| 
 | |
| def read_csv(data, dialect, fieldnames=None):
 | |
| 
 | |
|     data = to_native(data, errors='surrogate_or_strict')
 | |
| 
 | |
|     if PY3:
 | |
|         fake_fh = StringIO(data)
 | |
|     else:
 | |
|         fake_fh = BytesIO(data)
 | |
| 
 | |
|     reader = csv.DictReader(fake_fh, fieldnames=fieldnames, dialect=dialect)
 | |
| 
 | |
|     return reader
 |